You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/02/20 02:39:45 UTC

svn commit: r1570039 - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/...

Author: edwardyoon
Date: Thu Feb 20 01:39:45 2014
New Revision: 1570039

URL: http://svn.apache.org/r1570039
Log:
HAMA-870: Runtime message compression in Bundle (edwardyoon)

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/CompressableMessageManager.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 20 01:39:45 2014
@@ -30,6 +30,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-870: Runtime message compression in Bundle (edwardyoon)
    HAMA-869: Add Cloudera repository in maven pom file (Saisai Shao via edwardyoon)
    HAMA-856: Optimize BSPMessageBundle (edwardyoon)
    HAMA-853: Refactor Outgoing message manager (edwardyoon)

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Thu Feb 20 01:39:45 2014
@@ -253,13 +253,14 @@
   <!-- The message compression is not recommended for a large production environment -->
   <property>
     <name>hama.messenger.compression.class</name>
-    <value></value>
+    <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
     <description>The message compression algorithm to choose. Default is null.</description>
   </property>
   <property>
     <name>hama.messenger.compression.threshold</name>
-    <value></value>
-    <description>The Compressor threshold sets the level at which compression begins.</description>
+    <value>128</value>
+    <description>The Compressor threshold sets the level at which compression begins. 
+    The default is 128 bytes.</description>
   </property>
   
   <property>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 20 01:39:45 2014
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 
 /**
  * BSPMessageBundle stores a group of messages so that they can be sent in batch
@@ -41,21 +42,42 @@ public class BSPMessageBundle<M extends 
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
+  private BSPMessageCompressor<M> compressor = null;
+  private long threshold = 128;
+
   private String className = null;
   private int bundleSize = 0;
+  private int bundleLength = 0;
+
+  ByteArrayOutputStream byteBuffer = null;
+  DataOutputStream bufferDos = null;
 
-  ByteArrayOutputStream bos = null;
-  DataOutputStream dos = null;
   ByteArrayInputStream bis = null;
   DataInputStream dis = null;
 
   public BSPMessageBundle() {
-    bos = new ByteArrayOutputStream();
-    dos = new DataOutputStream(bos);
+    byteBuffer = new ByteArrayOutputStream();
+    bufferDos = new DataOutputStream(byteBuffer);
 
     bundleSize = 0;
+    bundleLength = 0;
+  }
+
+  ByteArrayOutputStream mbos = null;
+  DataOutputStream mdos = null;
+  ByteArrayInputStream mbis = null;
+  DataInputStream mdis = null;
+
+  public byte[] serialize(M message) throws IOException {
+    mbos = new ByteArrayOutputStream();
+    mdos = new DataOutputStream(mbos);
+    message.write(mdos);
+    return mbos.toByteArray();
   }
 
+  private byte[] compressed;
+  private byte[] serialized;
+
   /**
    * Add message to this bundle.
    * 
@@ -63,7 +85,21 @@ public class BSPMessageBundle<M extends 
    */
   public void addMessage(M message) {
     try {
-      message.write(dos);
+      serialized = serialize(message);
+
+      if (compressor != null && serialized.length > threshold) {
+        bufferDos.writeBoolean(true);
+        compressed = compressor.compress(serialized);
+        bufferDos.writeInt(compressed.length);
+        bufferDos.write(compressed);
+
+        bundleLength += compressed.length;
+      } else {
+        bufferDos.writeBoolean(false);
+        bufferDos.write(serialized);
+
+        bundleLength += serialized.length;
+      }
     } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
@@ -76,11 +112,12 @@ public class BSPMessageBundle<M extends 
   }
 
   public Iterator<M> iterator() {
-    bis = new ByteArrayInputStream(bos.toByteArray());
+    bis = new ByteArrayInputStream(byteBuffer.toByteArray());
     dis = new DataInputStream(bis);
-    
+
     Iterator<M> it = new Iterator<M>() {
       M msg;
+      byte[] decompressed;
 
       @Override
       public boolean hasNext() {
@@ -98,6 +135,13 @@ public class BSPMessageBundle<M extends 
       @SuppressWarnings("unchecked")
       @Override
       public M next() {
+        boolean isCompressed = false;
+        try {
+          isCompressed = dis.readBoolean();
+        } catch (IOException e1) {
+          e1.printStackTrace();
+        }
+
         Class<M> clazz = null;
         try {
           clazz = (Class<M>) Class.forName(className);
@@ -107,7 +151,19 @@ public class BSPMessageBundle<M extends 
         msg = ReflectionUtils.newInstance(clazz, null);
 
         try {
-          msg.readFields(dis);
+          if (isCompressed) {
+            int length = dis.readInt();
+            compressed = new byte[length];
+            dis.readFully(compressed);
+            decompressed = compressor.decompress(compressed);
+
+            mbis = new ByteArrayInputStream(decompressed);
+            mdis = new DataInputStream(mbis);
+            msg.readFields(mdis);
+          } else {
+            msg.readFields(dis);
+          }
+
         } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
@@ -128,12 +184,17 @@ public class BSPMessageBundle<M extends 
     return bundleSize;
   }
 
+  public void setCompressor(BSPMessageCompressor<M> compressor, long threshold) {
+    this.compressor = compressor;
+    this.threshold = threshold;
+  }
+
   /**
-   * @return the byte length of bundle object
+   * @return the byte length of messages
    * @throws IOException
    */
   public long getLength() throws IOException {
-    return bos.toByteArray().length;
+    return bundleLength;
   }
 
   @Override
@@ -141,7 +202,7 @@ public class BSPMessageBundle<M extends 
     out.writeInt(bundleSize);
     if (bundleSize > 0) {
       out.writeUTF(className);
-      byte[] messages = bos.toByteArray();
+      byte[] messages = byteBuffer.toByteArray();
       out.writeInt(messages.length);
       out.write(messages);
     }
@@ -155,7 +216,7 @@ public class BSPMessageBundle<M extends 
       int bytesLength = in.readInt();
       byte[] temp = new byte[bytesLength];
       in.readFully(temp);
-      dos.write(temp);
+      bufferDos.write(temp);
     }
   }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Feb 20 01:39:45 2014
@@ -341,7 +341,7 @@ public class LocalBSPRunner implements J
 
     @Override
     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-        Configuration conf, InetSocketAddress peerAddress) {
+        HamaConfiguration conf, InetSocketAddress peerAddress) {
       super.init(attemptId, peer, conf, peerAddress);
       MANAGER_MAP.put(peerAddress, this);
       selfAddress = peerAddress;
@@ -351,8 +351,13 @@ public class LocalBSPRunner implements J
     @Override
     public void transfer(InetSocketAddress addr, BSPMessageBundle<M> bundle)
         throws IOException {
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
+          bundle.getLength());
+      bundle.setCompressor(compressor,
+          conf.getLong("hama.messenger.compression.threshold", 512));
+
       Iterator<M> it = bundle.iterator();
-      while(it.hasNext()) {
+      while (it.hasNext()) {
         MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
         peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
             1L);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 20 01:39:45 2014
@@ -30,10 +30,13 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
 import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
@@ -72,6 +75,8 @@ public abstract class AbstractMessageMan
   // List of listeners for all the sent messages
   protected Queue<MessageEventListener<M>> messageListenerQueue;
 
+  protected BSPMessageCompressor<M> compressor;
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.MessageManager#init(org.apache.hama.bsp.
@@ -80,7 +85,7 @@ public abstract class AbstractMessageMan
    */
   @Override
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress peerAddress) {
+      HamaConfiguration conf, InetSocketAddress peerAddress) {
     this.messageListenerQueue = new LinkedList<MessageEventListener<M>>();
     this.attemptId = attemptId;
     this.peer = peer;
@@ -88,8 +93,10 @@ public abstract class AbstractMessageMan
     this.localQueue = getReceiverQueue();
     this.localQueueForNextIteration = getSynchronizedReceiverQueue();
     this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
+
+    this.compressor = new BSPMessageCompressorFactory<M>().getCompressor(conf);
     this.outgoingMessageManager = getOutgoingMessageManager();
-    this.outgoingMessageManager.init(conf);
+    this.outgoingMessageManager.init(conf, compressor);
   }
 
   /*
@@ -271,8 +278,10 @@ public abstract class AbstractMessageMan
   }
 
   @Override
-  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
-      throws IOException {
+  public void loopBackMessages(BSPMessageBundle<M> bundle) throws IOException {
+    bundle.setCompressor(compressor,
+        conf.getLong("hama.messenger.compression.threshold", 128));
+
     Iterator<? extends Writable> it = bundle.iterator();
     while (it.hasNext()) {
       loopBackMessage(it.next());

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManager.java Thu Feb 20 01:39:45 2014
@@ -21,7 +21,6 @@ import java.io.IOException;
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
-import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 
 /**
@@ -47,12 +46,4 @@ public interface HamaMessageManager<M ex
    */
   public void put(BSPMessageBundle<M> messages) throws IOException;
 
-  /**
-   * This method puts a compressed message bundle for the next iteration.
-   * Accessed concurrently from protocol, this must be sychronized internally.
-   * 
-   * @param compMsgBundle
-   */
-  public void put(BSPCompressedBundle compMsgBundle) throws IOException;
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/HamaMessageManagerImpl.java Thu Feb 20 01:39:45 2014
@@ -26,11 +26,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.compress.BSPCompressedBundle;
 import org.apache.hama.ipc.HamaRPCProtocolVersion;
 import org.apache.hama.ipc.RPC;
 import org.apache.hama.ipc.RPC.Server;
@@ -41,7 +41,7 @@ import org.apache.hama.util.LRUCache;
  * 
  */
 public final class HamaMessageManagerImpl<M extends Writable> extends
-    CompressableMessageManager<M> implements HamaMessageManager<M> {
+    AbstractMessageManager<M> implements HamaMessageManager<M> {
 
   private static final Log LOG = LogFactory
       .getLog(HamaMessageManagerImpl.class);
@@ -53,9 +53,8 @@ public final class HamaMessageManagerImp
   @SuppressWarnings("serial")
   @Override
   public final void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress peerAddress) {
+      HamaConfiguration conf, InetSocketAddress peerAddress) {
     super.init(attemptId, peer, conf, peerAddress);
-    super.initCompression(conf);
     startRPCServer(conf, peerAddress);
     peersLRUCache = new LRUCache<InetSocketAddress, HamaMessageManager<M>>(
         maxCachedConnections) {
@@ -119,16 +118,8 @@ public final class HamaMessageManagerImp
       throw new IllegalArgumentException("Can not find " + addr.toString()
           + " to transfer messages to!");
     } else {
-      if (compressor != null
-          && (bundle.getLength() > conf.getLong(
-              "hama.messenger.compression.threshold", 1048576))) {
-
-        BSPCompressedBundle compMsgBundle = compressor.compressBundle(bundle);
-        bspPeerConnection.put(compMsgBundle);
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.COMPRESSED_MESSAGES, 1L);
-      } else {
-        bspPeerConnection.put(bundle);
-      }
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED, bundle.getLength());
+      bspPeerConnection.put(bundle);
     }
   }
 
@@ -165,12 +156,6 @@ public final class HamaMessageManagerImp
   }
 
   @Override
-  public final void put(BSPCompressedBundle compMsgBundle) throws IOException {
-    BSPMessageBundle<M> bundle = compressor.decompressBundle(compMsgBundle);
-    loopBackMessages(bundle);
-  }
-
-  @Override
   public final long getProtocolVersion(String arg0, long arg1)
       throws IOException {
     return versionID;
@@ -183,4 +168,5 @@ public final class HamaMessageManagerImp
     }
     return null;
   }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Thu Feb 20 01:39:45 2014
@@ -22,8 +22,8 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -50,7 +50,7 @@ public interface MessageManager<M extend
    * 
    */
   public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, M> peer,
-      Configuration conf, InetSocketAddress peerAddress);
+      HamaConfiguration conf, InetSocketAddress peerAddress);
 
   /**
    * Close is called after a task ran. Should be used to cleanup things e.G.
@@ -99,7 +99,7 @@ public interface MessageManager<M extend
   /**
    * Send the messages to self to receive in the next superstep.
    */
-  public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle)
+  public void loopBackMessages(BSPMessageBundle<M> bundle)
       throws IOException;
 
   /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingMessageManager.java Thu Feb 20 01:39:45 2014
@@ -21,19 +21,19 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 
 public interface OutgoingMessageManager<M extends Writable> {
 
-  public void init(Configuration conf);
-  
+  public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor);
+
   public void addMessage(String peerName, M msg);
 
   public void clear();
 
   public Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> getBundleIterator();
 
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/OutgoingPOJOMessageBundle.java Thu Feb 20 01:39:45 2014
@@ -22,30 +22,34 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.util.BSPNetUtils;
 import org.apache.hama.util.ReflectionUtils;
 
 public class OutgoingPOJOMessageBundle<M extends Writable> implements
     OutgoingMessageManager<M> {
 
+  private HamaConfiguration conf;
+  private BSPMessageCompressor<M> compressor;
   private Combiner<M> combiner;
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
   private HashMap<InetSocketAddress, BSPMessageBundle<M>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<M>>();
 
   @SuppressWarnings("unchecked")
   @Override
-  public void init(Configuration conf) {
+  public void init(HamaConfiguration conf, BSPMessageCompressor<M> compressor) {
+    this.conf = conf;
+    this.compressor = compressor;
     final String combinerName = conf.get(Constants.COMBINER_CLASS);
     if (combinerName != null) {
       try {
-        Combiner<M> combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
+        this.combiner = (Combiner<M>) ReflectionUtils.newInstance(conf
             .getClassByName(combinerName));
-        this.combiner = combiner;
       } catch (ClassNotFoundException e) {
         // TODO Auto-generated catch block
         e.printStackTrace();
@@ -61,6 +65,8 @@ public class OutgoingPOJOMessageBundle<M
       BSPMessageBundle<M> bundle = outgoingBundles.get(targetPeerAddress);
       bundle.addMessage(msg);
       BSPMessageBundle<M> combined = new BSPMessageBundle<M>();
+      combined.setCompressor(compressor,
+          conf.getLong("hama.messenger.compression.threshold", 128));
       combined.addMessage(combiner.combine(bundle));
       outgoingBundles.put(targetPeerAddress, combined);
     } else {
@@ -79,7 +85,10 @@ public class OutgoingPOJOMessageBundle<M
     }
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
-      outgoingBundles.put(targetPeerAddress, new BSPMessageBundle<M>());
+      BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+      bundle.setCompressor(compressor,
+          conf.getLong("hama.messenger.compression.threshold", 128));
+      outgoingBundles.put(targetPeerAddress, bundle);
     }
     return targetPeerAddress;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressor.java Thu Feb 20 01:39:45 2014
@@ -20,32 +20,16 @@ package org.apache.hama.bsp.message.comp
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 
 /**
- * Provides utilities for compressing and decompressing BSPMessageBundle.
+ * Provides utilities for compressing and decompressing byte array.
  * 
  */
 public abstract class BSPMessageCompressor<M extends Writable> {
 
   public static final Log LOG = LogFactory.getLog(BSPMessageCompressor.class);
 
-  /**
-   * Compresses the BSPMessageBundle and returns it in form of a
-   * BSPCompressedBundle.
-   * 
-   * @param bundle
-   * @return
-   */
-  public abstract BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle);
+  public abstract byte[] compress(byte[] bytes);
 
-  /**
-   * Decompresses a BSPCompressedBundle and returns the corresponding
-   * BSPMessageBundle.
-   * 
-   * @param compMsgBundle
-   * @return
-   */
-  public abstract BSPMessageBundle<M> decompressBundle(
-      BSPCompressedBundle compMsgBundle);
+  public abstract byte[] decompress(byte[] compressedBytes);
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/Bzip2Compressor.java Thu Feb 20 01:39:45 2014
@@ -23,11 +23,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
 import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hama.bsp.BSPMessageBundle;
 
 public class Bzip2Compressor<M extends Writable> extends
     BSPMessageCompressor<M> {
@@ -35,23 +35,19 @@ public class Bzip2Compressor<M extends W
   private final BZip2Codec codec = new BZip2Codec();
 
   @Override
-  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
-    BSPCompressedBundle compMsgBundle = null;
+  public byte[] compress(byte[] bytes) {
     ByteArrayOutputStream bos = null;
     CompressionOutputStream sos = null;
     DataOutputStream dos = null;
+    byte[] compressedBytes = null;
 
     try {
       bos = new ByteArrayOutputStream();
       sos = codec.createOutputStream(bos);
       dos = new DataOutputStream(sos);
-
-      bundle.write(dos);
       dos.close(); // Flush the stream as no more data will be sent.
 
-      byte[] data = bos.toByteArray();
-      compMsgBundle = new BSPCompressedBundle(data);
-
+      compressedBytes = bos.toByteArray();
     } catch (IOException ioe) {
       LOG.error("Unable to compress", ioe);
     } finally {
@@ -62,7 +58,7 @@ public class Bzip2Compressor<M extends W
         LOG.warn("Failed to close compression streams.", e);
       }
     }
-    return compMsgBundle;
+    return compressedBytes;
   }
 
   /**
@@ -73,20 +69,17 @@ public class Bzip2Compressor<M extends W
    * @return
    */
   @Override
-  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+  public byte[] decompress(byte[] compressedBytes) {
     ByteArrayInputStream bis = null;
     CompressionInputStream sis = null;
     DataInputStream dis = null;
-    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    byte[] bytes = null;
 
     try {
-      byte[] data = compMsgBundle.getData();
-      bis = new ByteArrayInputStream(data);
+      bis = new ByteArrayInputStream(compressedBytes);
       sis = codec.createInputStream(bis);
       dis = new DataInputStream(sis);
-
-      bundle.readFields(dis);
-
+      bytes = IOUtils.toByteArray(dis);
     } catch (IOException ioe) {
       LOG.error("Unable to decompress.", ioe);
     } finally {
@@ -99,7 +92,7 @@ public class Bzip2Compressor<M extends W
       }
     }
 
-    return bundle;
+    return bytes;
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/SnappyCompressor.java Thu Feb 20 01:39:45 2014
@@ -23,8 +23,8 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.BSPMessageBundle;
 import org.xerial.snappy.SnappyInputStream;
 import org.xerial.snappy.SnappyOutputStream;
 
@@ -32,22 +32,21 @@ public class SnappyCompressor<M extends 
     BSPMessageCompressor<M> {
 
   @Override
-  public BSPCompressedBundle compressBundle(BSPMessageBundle<M> bundle) {
-    BSPCompressedBundle compMsgBundle = null;
+  public byte[] compress(byte[] bytes) {
     ByteArrayOutputStream bos = null;
     SnappyOutputStream sos = null;
     DataOutputStream dos = null;
+    byte[] compressedBytes = null;
 
     try {
       bos = new ByteArrayOutputStream();
       sos = new SnappyOutputStream(bos);
       dos = new DataOutputStream(sos);
 
-      bundle.write(dos);
+      dos.write(bytes);
       dos.close(); // Flush the stream as no more data will be sent.
 
-      byte[] data = bos.toByteArray();
-      compMsgBundle = new BSPCompressedBundle(data);
+      compressedBytes = bos.toByteArray();
 
     } catch (IOException ioe) {
       LOG.error("Unable to compress", ioe);
@@ -59,7 +58,7 @@ public class SnappyCompressor<M extends 
         LOG.warn("Failed to close compression streams.", e);
       }
     }
-    return compMsgBundle;
+    return compressedBytes;
   }
 
   /**
@@ -70,20 +69,18 @@ public class SnappyCompressor<M extends 
    * @return
    */
   @Override
-  public BSPMessageBundle<M> decompressBundle(BSPCompressedBundle compMsgBundle) {
+  public byte[] decompress(byte[] compressedBytes) {
     ByteArrayInputStream bis = null;
     SnappyInputStream sis = null;
     DataInputStream dis = null;
-    BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
+    byte[] bytes = null;
 
     try {
-      byte[] data = compMsgBundle.getData();
-      bis = new ByteArrayInputStream(data);
+      bis = new ByteArrayInputStream(compressedBytes);
       sis = new SnappyInputStream(bis);
       dis = new DataInputStream(sis);
 
-      bundle.readFields(dis);
-
+      bytes = IOUtils.toByteArray(dis);
     } catch (IOException ioe) {
       LOG.error("Unable to decompress.", ioe);
     } finally {
@@ -96,7 +93,7 @@ public class SnappyCompressor<M extends 
       }
     }
 
-    return bundle;
+    return bytes;
   }
 
 }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Feb 20 01:39:45 2014
@@ -76,7 +76,7 @@ public class TestCheckpoint extends Test
 
     @Override
     public void init(TaskAttemptID attemptId, BSPPeer<?, ?, ?, ?, Text> peer,
-        Configuration conf, InetSocketAddress peerAddress) {
+        HamaConfiguration conf, InetSocketAddress peerAddress) {
       // TODO Auto-generated method stub
 
     }
@@ -130,9 +130,8 @@ public class TestCheckpoint extends Test
       listener.onMessageReceived(message);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
-    public void loopBackMessages(BSPMessageBundle<? extends Writable> bundle) {
+    public void loopBackMessages(BSPMessageBundle<Text> bundle) {
       this.loopbackBundle = (BSPMessageBundle<Text>) bundle;
     }
 
@@ -146,11 +145,11 @@ public class TestCheckpoint extends Test
       this.listener = listener;
     }
 
-	@Override
-	public InetSocketAddress getListenerAddress() {
-		// TODO Auto-generated method stub
-		return null;
-	}
+    @Override
+    public InetSocketAddress getListenerAddress() {
+      // TODO Auto-generated method stub
+      return null;
+    }
 
   }
 
@@ -444,8 +443,8 @@ public class TestCheckpoint extends Test
   }
 
   private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
-      @SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step,
-      long count) {
+      @SuppressWarnings("rawtypes")
+      BSPPeer bspTask, BSPJob job, long step, long count) {
 
     ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
 
@@ -648,7 +647,7 @@ public class TestCheckpoint extends Test
 
     BSPMessageBundle<Text> bundleRead = messenger.getLoopbackBundle();
     assertEquals(5, bundleRead.size());
-    
+
     String recoveredMsg = bundleRead.iterator().next().toString();
     assertEquals(recoveredMsg, "data");
     dfs.delete(new Path("checkpoint"), true);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Thu Feb 20 01:39:45 2014
@@ -17,17 +17,21 @@
  */
 package org.apache.hama.bsp.message.compress;
 
-import java.util.Iterator;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hama.bsp.message.type.IntegerMessage;
 
 public class TestBSPMessageCompressor extends TestCase {
 
-  public void testCompression() {
+  public void testCompression() throws IOException {
     Configuration configuration = new Configuration();
     BSPMessageCompressor<IntegerMessage> compressor = new BSPMessageCompressorFactory<IntegerMessage>()
         .getCompressor(configuration);
@@ -40,24 +44,27 @@ public class TestBSPMessageCompressor ex
 
     assertNotNull(compressor);
 
-    int n = 20;
-    BSPMessageBundle<IntegerMessage> bundle = new BSPMessageBundle<IntegerMessage>();
-    IntegerMessage[] dmsg = new IntegerMessage[n];
-
-    for (int i = 1; i <= n; i++) {
-      dmsg[i - 1] = new IntegerMessage("" + i, i);
-      bundle.addMessage(dmsg[i - 1]);
-    }
-
-    BSPCompressedBundle compBundle = compressor.compressBundle(bundle);
-    BSPMessageBundle<IntegerMessage> uncompBundle = compressor
-        .decompressBundle(compBundle);
-
-    int i = 1;
-    Iterator<IntegerMessage> it = uncompBundle.iterator();
-    while(it.hasNext()) {
-      assertEquals((int) it.next().getData(), i);
-      i++;
-    }
+    IntWritable a = new IntWritable(123);
+    IntWritable b = new IntWritable(321);
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(bos);
+    a.write(dos);
+    b.write(dos);
+
+    byte[] x = bos.toByteArray();
+
+    byte[] compressed = compressor.compress(x);
+    byte[] decompressed = compressor.decompress(compressed);
+
+    ByteArrayInputStream bis = new ByteArrayInputStream(decompressed);
+    DataInputStream dis = new DataInputStream(bis);
+
+    IntWritable c = new IntWritable();
+    c.readFields(dis);
+    assertEquals(123, c.get());
+
+    IntWritable d = new IntWritable();
+    d.readFields(dis);
+    assertEquals(321, d.get());
   }
 }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Thu Feb 20 01:39:45 2014
@@ -38,6 +38,7 @@ import org.apache.hama.bsp.ClusterStatus
 import org.apache.hama.bsp.FileOutputFormat;
 import org.apache.hama.bsp.NullInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.bsp.sync.SyncException;
 
 public class PiEstimator {
@@ -116,6 +117,9 @@ public class PiEstimator {
     HamaConfiguration conf = new HamaConfiguration();
 
     BSPJob bsp = new BSPJob(conf, PiEstimator.class);
+    bsp.setCompressionCodec(SnappyCompressor.class);
+    bsp.setCompressionThreshold(40);
+    
     // Set the job name
     bsp.setJobName("Pi Estimation Example");
     bsp.setBspClass(MyEstimator.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SemiClusterJobDriver.java Thu Feb 20 01:39:45 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hama.examples;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -28,14 +30,13 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.ml.semiclustering.SemiClusterMessage;
 import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
 import org.apache.hama.ml.semiclustering.SemiClusterVertexOutputWriter;
 import org.apache.hama.ml.semiclustering.SemiClusteringVertex;
 
-import java.io.IOException;
-
 public class SemiClusterJobDriver {
 
   protected static final Log LOG = LogFactory
@@ -50,6 +51,10 @@ public class SemiClusterJobDriver {
   public static void startTask(HamaConfiguration conf) throws IOException,
       InterruptedException, ClassNotFoundException {
     GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+    // 80,887,377
+    semiClusterJob.setCompressionCodec(SnappyCompressor.class);
+    semiClusterJob.setCompressionThreshold(10);
+    
     semiClusterJob
         .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
     semiClusterJob.setJobName("SemiClusterJob");

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SemiClusterMatchingTest.java Thu Feb 20 01:39:45 2014
@@ -45,6 +45,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.TextInputFormat;
 import org.apache.hama.bsp.TextOutputFormat;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.ml.semiclustering.SemiClusterMessage;
 import org.apache.hama.ml.semiclustering.SemiClusterTextReader;
@@ -56,7 +57,6 @@ import org.junit.Test;
 public class SemiClusterMatchingTest extends TestCase {
   private static String INPUT = "/tmp/graph.txt";
   private static String OUTPUT = "/tmp/graph-semiCluster";
-  private static final String requestedGraphJobMaxIterationString = "hama.graph.max.iteration";
   private static final String semiClusterMaximumVertexCount = "semicluster.max.vertex.count";
   private static final String graphJobMessageSentCount = "semicluster.max.message.sent.count";
   private static final String graphJobVertexMaxClusterCount = "vertex.max.cluster.count";
@@ -225,11 +225,15 @@ public class SemiClusterMatchingTest ext
     try {
 
       HamaConfiguration conf = new HamaConfiguration();
-      conf.setInt(requestedGraphJobMaxIterationString, 15);
       conf.setInt(semiClusterMaximumVertexCount, 100);
       conf.setInt(graphJobMessageSentCount, 100);
       conf.setInt(graphJobVertexMaxClusterCount, 1);
       GraphJob semiClusterJob = new GraphJob(conf, SemiClusterJobDriver.class);
+      semiClusterJob.setMaxIteration(15);
+      
+      semiClusterJob.setCompressionCodec(SnappyCompressor.class);
+      semiClusterJob.setCompressionThreshold(10);
+      
       semiClusterJob
           .setVertexOutputWriterClass(SemiClusterVertexOutputWriter.class);
       semiClusterJob.setJobName("SemiClusterJob");

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/OutgoingVertexMessagesManager.java Thu Feb 20 01:39:45 2014
@@ -27,13 +27,14 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.Constants;
+import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.message.OutgoingMessageManager;
+import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.util.BSPNetUtils;
 
 public class OutgoingVertexMessagesManager<M extends Writable> implements
@@ -41,6 +42,8 @@ public class OutgoingVertexMessagesManag
   protected static final Log LOG = LogFactory
       .getLog(OutgoingVertexMessagesManager.class);
 
+  private HamaConfiguration conf;
+  private BSPMessageCompressor<GraphJobMessage> compressor;
   private Combiner<Writable> combiner;
   private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
   private HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>> outgoingBundles = new HashMap<InetSocketAddress, BSPMessageBundle<GraphJobMessage>>();
@@ -51,7 +54,10 @@ public class OutgoingVertexMessagesManag
 
   @SuppressWarnings("unchecked")
   @Override
-  public void init(Configuration conf) {
+  public void init(HamaConfiguration conf,
+      BSPMessageCompressor<GraphJobMessage> compressor) {
+    this.conf = conf;
+    this.compressor = compressor;
     if (!conf.getClass(Constants.COMBINER_CLASS, Combiner.class).equals(
         Combiner.class)) {
       LOG.debug("Combiner class: " + conf.get(Constants.COMBINER_CLASS));
@@ -112,8 +118,9 @@ public class OutgoingVertexMessagesManag
     }
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
-      outgoingBundles.put(targetPeerAddress,
-          new BSPMessageBundle<GraphJobMessage>());
+      BSPMessageBundle<GraphJobMessage> bundle = new BSPMessageBundle<GraphJobMessage>();
+      bundle.setCompressor(compressor, conf.getLong("hama.messenger.compression.threshold", 128));
+      outgoingBundles.put(targetPeerAddress, bundle);
     }
     return targetPeerAddress;
   }
@@ -141,4 +148,5 @@ public class OutgoingVertexMessagesManag
     vertexMessageMap.clear();
     return outgoingBundles.entrySet().iterator();
   }
+
 }

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1570039&r1=1570038&r2=1570039&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Feb 20 01:39:45 2014
@@ -36,6 +36,7 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TestBSPMasterGroomServer;
+import org.apache.hama.bsp.message.compress.SnappyCompressor;
 import org.apache.hama.commons.io.TextArrayWritable;
 import org.apache.hama.graph.example.PageRank;
 import org.apache.hama.graph.example.PageRank.PagerankSeqReader;
@@ -85,6 +86,7 @@ public class TestSubmitGraphJob extends 
     // set the defaults
     bsp.setMaxIteration(30);
 
+    bsp.setCompressionCodec(SnappyCompressor.class);
     bsp.setAggregatorClass(AverageAggregator.class);
 
     bsp.setInputFormat(SequenceFileInputFormat.class);