You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/04/20 17:10:58 UTC

svn commit: r1470171 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/bundle/ core/src/main/java/org/apache/hama/bsp/message/io/ core/src/main/java/org/apache/hama/bsp/message/queue/ co...

Author: surajsmenon
Date: Sat Apr 20 15:10:57 2013
New Revision: 1470171

URL: http://svn.apache.org/r1470171
Log:
Fixes for [HAMA-707] and [HAMA-722]

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Apr 20 15:10:57 2013
@@ -8,7 +8,11 @@ Release 0.7 (unreleased changes)
 
   BUG FIXES
 
+
   IMPROVEMENTS
+   
+   HAMA-707: BSPMessageBundle should be able to encapsulate messages serialized in ByteBuffer (surajsmenon) 
+   HAMA-722: Messaging queue should construct sender and receiver queue.(surajsmenon)
 
 Release 0.6.1 - April 01, 2013
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat Apr 20 15:10:57 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
@@ -40,7 +39,6 @@ import org.apache.hama.bsp.TaskAttemptID
 import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferQueue;
 import org.apache.hama.bsp.message.queue.SingleLockQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
 import org.apache.hama.util.BSPNetUtils;
@@ -208,12 +206,9 @@ public abstract class AbstractMessageMan
    * @return a <b>new</b> queue implementation.
    */
   protected MessageQueue<M> getSenderQueue() {
-    Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
-    LOG.debug("Creating new " + queueClass);
     @SuppressWarnings("unchecked")
-    MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
-        .newInstance(queueClass, conf);
-    MessageQueue<M> queue = newInstance.getSenderQueue();
+    MessageQueue<M> queue = MessageTransferQueueFactory
+        .getMessageTransferQueue(conf).getSenderQueue(conf);
     queue.init(conf, attemptId);
     return queue;
   }
@@ -227,12 +222,9 @@ public abstract class AbstractMessageMan
    * @return a <b>new</b> queue implementation.
    */
   protected MessageQueue<M> getReceiverQueue() {
-    Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
-    LOG.debug("Creating new " + queueClass);
     @SuppressWarnings("unchecked")
-    MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
-        .newInstance(queueClass, conf);
-    MessageQueue<M> queue = newInstance.getReceiverQueue();
+    MessageQueue<M> queue = MessageTransferQueueFactory
+        .getMessageTransferQueue(conf).getReceiverQueue(conf);
     queue.init(conf, attemptId);
     return queue;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sat Apr 20 15:10:57 2013
@@ -36,7 +36,11 @@ import org.apache.hama.bsp.message.queue
  */
 public interface MessageManager<M extends Writable> {
 
+  @Deprecated
   public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
+  public static final String RECEIVE_QUEUE_TYPE_CLASS = "hama.messenger.receive.queue.class";
+  public static final String SENDER_QUEUE_TYPE_CLASS = "hama.messenger.sender.queue.class";
+  public static final String TRANSFER_QUEUE_TYPE_CLASS = "hama.messenger.xfer.queue.class";
   public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections";
 
   /**

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * Factory class to define protocols between the sender and receiver queues.
+ * 
+ * @param <M> The message type.
+ */
+public class MessageTransferQueueFactory<M> {
+
+  private static final Log LOG = LogFactory.getLog(MessageTransferQueueFactory.class);
+
+  private static class BackwardCompatibleTransferQueue<M> implements
+      MessageTransferQueue<M> {
+
+    @Override
+    public MessageQueue<M> getSenderQueue(Configuration conf) {
+      return getMessageQueue(conf);
+    }
+
+    @Override
+    public MessageQueue<M> getReceiverQueue(Configuration conf) {
+      return getMessageQueue(conf);
+    }
+
+    @SuppressWarnings({ "unchecked", "deprecation" })
+    private MessageQueue<M> getMessageQueue(Configuration conf) {
+      return ReflectionUtils.newInstance(conf.getClass(
+          MessageManager.QUEUE_TYPE_CLASS, MemoryQueue.class,
+          MessageQueue.class));
+    }
+
+  }
+
+  private static class DefaultMessageTransferQueue<M> implements
+      MessageTransferQueue<M> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public MessageQueue<M> getSenderQueue(Configuration conf) {
+      return ReflectionUtils.newInstance(conf.getClass(
+          MessageManager.SENDER_QUEUE_TYPE_CLASS, MemoryQueue.class,
+          MessageQueue.class));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public MessageQueue<M> getReceiverQueue(Configuration conf) {
+      return ReflectionUtils.newInstance(conf.getClass(
+          MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+          MessageQueue.class));
+    }
+
+  }
+
+  @SuppressWarnings({ "rawtypes", "deprecation" })
+  public static MessageTransferQueue getMessageTransferQueue(Configuration conf) {
+
+    if (conf.getClass(MessageManager.QUEUE_TYPE_CLASS, null) != null) {
+      LOG.warn("Message queue is configured on deprecated parameter:"
+          + MessageManager.QUEUE_TYPE_CLASS);
+      return new BackwardCompatibleTransferQueue();
+    }
+    return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass(
+        MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        DefaultMessageTransferQueue.class, MessageTransferQueue.class));
+
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSPMessageBundle stores a group of BSPMessages so that they can be sent in
+ * batch rather than individually.
+ * 
+ */
+public interface BSPMessageBundle<M extends Writable> {
+
+  /**
+   * Returns the size of the message.
+   * 
+   * @return Size of serialized message bundle. -1 if the size is not known.
+   */
+  public long getSize();
+
+  /**
+   * Returns the number of elements.
+   * 
+   * @return Number of elements. -1 if the number of elements is not known.
+   */
+  public int getNumElements();
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSP Message Bundle that encapsulates a ByteBuffer.
+ * 
+ * @param <M> Message type.
+ */
+public class ByteBufferBSPMessageBundle<M extends Writable> implements
+    BSPMessageBundle<M> {
+
+  private ByteBuffer[] byteArr;
+  private int count;
+
+  public ByteBufferBSPMessageBundle(ByteBuffer[] buffer, int count) {
+    byteArr = buffer;
+    this.count = count;
+  }
+
+  public ByteBufferBSPMessageBundle(ByteBuffer[] buffer) {
+    this(buffer, -1);
+  }
+
+  public ByteBuffer[] getBuffers() {
+    return byteArr;
+  }
+
+  @Override
+  public long getSize() {
+    return byteArr.length;
+  }
+
+  @Override
+  public int getNumElements() {
+    return count;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSP Message Bundle that stores the messages as heap byte arrays.
+ * 
+ * @param <M> Message type.
+ */
+public class HeapByteArrayBSPMessageBundle<M extends Writable> implements
+    BSPMessageBundle<M> {
+
+  byte[] byteArr;
+  int count;
+
+  public HeapByteArrayBSPMessageBundle(byte[] buffer) {
+    this(buffer, -1);
+  }
+
+  public HeapByteArrayBSPMessageBundle(byte[] buffer, int count) {
+    byteArr = buffer;
+    this.count = count;
+  }
+
+  public byte[] getBuffer() {
+    return byteArr;
+  }
+
+  @Override
+  public long getSize() {
+    return byteArr.length;
+  }
+
+  @Override
+  public int getNumElements() {
+    return byteArr.length;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+public class POJOMessageBundle<M extends Writable> implements
+    BSPMessageBundle<M>, Iterable<M> {
+
+  protected static final Log LOG = LogFactory.getLog(POJOMessageBundle.class);
+
+  protected HashMap<String, List<M>> messages = new HashMap<String, List<M>>();
+  protected HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
+
+  protected int numElements;
+
+  private static class BundleIterator<M extends Writable> implements
+      Iterator<M> {
+
+    private Iterator<List<M>> listIterator;
+    private Iterator<M> messageIterator;
+
+    public BundleIterator(Iterator<List<M>> listIterator) {
+      this.listIterator = listIterator;
+    }
+
+    @Override
+    public boolean hasNext() {
+      return listIterator.hasNext() || messageIterator.hasNext();
+    }
+
+    @Override
+    public M next() {
+      while (true) {
+        if (messageIterator != null && messageIterator.hasNext()) {
+          return messageIterator.next();
+        } else {
+          if (listIterator.hasNext()) {
+            messageIterator = listIterator.next().iterator();
+          } else {
+            return null;
+          }
+        }
+      }
+    }
+
+    @Override
+    public void remove() {
+    }
+
+  }
+
+  public POJOMessageBundle() {
+  }
+
+  /**
+   * Add message to this bundle.
+   * 
+   * @param message BSPMessage to add.
+   */
+  public void addMessage(M message) {
+    String className = message.getClass().getName();
+    List<M> list = messages.get(className);
+    ++numElements;
+    if (list == null) {
+      list = new ArrayList<M>();
+      messages.put(className, list);
+    }
+
+    list.add(message);
+  }
+
+  public List<M> getMessages() {
+    // here we use an arraylist, because we know the size and outside may need
+    // random access
+    List<M> mergeList = new ArrayList<M>(messages.size());
+    for (List<M> c : messages.values()) {
+      mergeList.addAll(c);
+    }
+    return mergeList;
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return new BundleIterator<M>(this.messages.values().iterator());
+  }
+
+  @Override
+  public long getSize() {
+    return numElements;
+  }
+
+  @Override
+  public int getNumElements() {
+    return numElements;
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class WritableMessageBundle<M extends Writable> extends
+    POJOMessageBundle<M> implements Writable {
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    // writes the k/v mapping size
+    out.writeInt(messages.size());
+    if (messages.size() > 0) {
+      for (Entry<String, List<M>> entry : messages.entrySet()) {
+        out.writeUTF(entry.getKey());
+        List<M> messageList = entry.getValue();
+        out.writeInt(messageList.size());
+        for (M msg : messageList) {
+          msg.write(out);
+        }
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void readFields(DataInput in) throws IOException {
+    if (messages == null) {
+      messages = new HashMap<String, List<M>>();
+    }
+    int numMessages = in.readInt();
+    if (numMessages > 0) {
+      for (int entries = 0; entries < numMessages; entries++) {
+        String className = in.readUTF();
+        int size = in.readInt();
+        List<M> msgList = new ArrayList<M>(size);
+        messages.put(className, msgList);
+
+        Class<M> clazz = null;
+        if ((clazz = classCache.get(className)) == null) {
+          try {
+            clazz = (Class<M>) Class.forName(className);
+            classCache.put(className, clazz);
+          } catch (ClassNotFoundException e) {
+            LOG.error("Class was not found.", e);
+          }
+        }
+
+        for (int i = 0; i < size; i++) {
+          M msg = ReflectionUtils.newInstance(clazz, null);
+          msg.readFields(in);
+          msgList.add(msg);
+        }
+
+      }
+    }
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Sat Apr 20 15:10:57 2013
@@ -441,6 +441,7 @@ public class SpillingDataOutputBuffer ex
         }
 
       }
+      closed_ = true;
     }
 
   }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+
+public interface BSPMessageInterface<M extends Writable> {
+  
+  public void add(BSPMessageBundle<M> bundle);
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+
+public abstract class ByteArrayMessageQueue<M extends Writable> implements
+    BSPMessageInterface<M>, MessageQueue<M> {
+
+  @Override
+  public void add(BSPMessageBundle<M> bundle) {
+    // TODO Auto-generated method stub
+    
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Sat Apr 20 15:10:57 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp.message.queue;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
@@ -46,8 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> implements MessageQueue<M>,
-    MessageTransferQueue<M> {
+public final class DiskQueue<M extends Writable> extends POJOMessageQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 
@@ -172,7 +170,7 @@ public final class DiskQueue<M extends W
   }
 
   @Override
-  public final void addAll(Collection<M> col) {
+  public final void addAll(Iterable<M> col) {
     for (M item : col) {
       add(item);
     }
@@ -310,15 +308,4 @@ public final class DiskQueue<M extends W
   public boolean isMessageSerialized() {
     return false;
   }
-
-  @Override
-  public MessageQueue<M> getSenderQueue() {
-    return this;
-  }
-
-  @Override
-  public MessageQueue<M> getReceiverQueue() {
-    return this;
-  }
-
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The disk transfer queue protocol.
+ *
+ * @param <M>
+ */
+public class DiskTransferProtocolQueue<M extends Writable> implements
+    MessageTransferQueue<M> {
+
+  @Override
+  public MessageQueue<M> getSenderQueue(Configuration conf) {
+    return new DiskQueue<M>();
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue(Configuration conf) {
+    return new DiskQueue<M>();
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Sat Apr 20 15:10:57 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.bsp.message.queue;
 
 import java.util.ArrayDeque;
-import java.util.Collection;
 import java.util.Deque;
 import java.util.Iterator;
 
@@ -29,15 +28,15 @@ import org.apache.hama.bsp.TaskAttemptID
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
-public final class MemoryQueue<M extends Writable> implements MessageQueue<M>,
-    MessageTransferQueue<M> {
+public final class MemoryQueue<M extends Writable> extends POJOMessageQueue<M> {
 
   private final Deque<M> deque = new ArrayDeque<M>();
   private Configuration conf;
 
   @Override
-  public final void addAll(Collection<M> col) {
-    deque.addAll(col);
+  public final void addAll(Iterable<M> col) {
+    for (M m : col)
+      deque.add(m);
   }
 
   @Override
@@ -108,15 +107,4 @@ public final class MemoryQueue<M extends
   public boolean isMessageSerialized() {
     return false;
   }
-
-  @Override
-  public MessageQueue<M> getSenderQueue() {
-    return this;
-  }
-
-  @Override
-  public MessageQueue<M> getReceiverQueue() {
-    return this;
-  }
-
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Queue transfer protocol for memory queue.
+ *
+ * @param <M>
+ */
+public class MemoryTransferProtocol<M extends Writable> implements
+    MessageTransferQueue<M> {
+
+  @Override
+  public MessageQueue<M> getSenderQueue(Configuration conf) {
+    return new MemoryQueue<M>();
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue(Configuration conf) {
+    return new MemoryQueue<M>();
+  }
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Sat Apr 20 15:10:57 2013
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Collection;
-
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hama.bsp.TaskAttemptID;
@@ -51,7 +49,7 @@ public interface MessageQueue<M> extends
   /**
    * Adds a whole Java Collection to the implementing queue.
    */
-  public void addAll(Collection<M> col);
+  public void addAll(Iterable<M> col);
 
   /**
    * Adds the other queue to this queue.

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java Sat Apr 20 15:10:57 2013
@@ -17,21 +17,23 @@
  */
 package org.apache.hama.bsp.message.queue;
 
+import org.apache.hadoop.conf.Configuration;
+
 /**
- * 
+ * Interface to define the sender queue and receiver queue protocol.
  * 
  * @param <M>
  */
 public interface MessageTransferQueue<M> {
-
+  
   /**
-   * 
+   * Instantiate a sender queue.
    */
-  public MessageQueue<M> getSenderQueue();
+  public MessageQueue<M> getSenderQueue(Configuration conf);
 
   /**
-   * 
+   * Instantiate a receiver queue.
    */
-  public MessageQueue<M> getReceiverQueue();
+  public MessageQueue<M> getReceiverQueue(Configuration conf);
 
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
+
+/**
+ * Java object message queue.
+ *
+ * @param <M> Message type.
+ */
+public abstract class POJOMessageQueue<M extends Writable> implements
+    BSPMessageInterface<M>, Iterable<M>, MessageQueue<M> {
+
+  @Override
+  public void add(BSPMessageBundle<M> bundle){
+    this.addAll((POJOMessageBundle<M>)bundle);
+  }
+  
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Sat Apr 20 15:10:57 2013
@@ -17,7 +17,6 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
@@ -117,9 +116,10 @@ public final class SingleLockQueue<T> im
    * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
    */
   @Override
-  public void addAll(Collection<T> col) {
+  public void addAll(Iterable<T> col) {
     synchronized (mutex) {
-      queue.addAll(col);
+      for (T m : col)
+        queue.add(m);
     }
   }
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -17,20 +17,21 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.PriorityQueue;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
 
 /**
  * Heap (Java's priority queue) based message queue implementation that supports
  * sorted receive and send.
  */
 public final class SortedMessageQueue<M extends WritableComparable<M>>
-    implements MessageQueue<M>, MessageTransferQueue<M> {
+    implements MessageQueue<M>, BSPMessageInterface<M> {
 
   private final PriorityQueue<M> queue = new PriorityQueue<M>();
   private Configuration conf;
@@ -51,8 +52,9 @@ public final class SortedMessageQueue<M 
   }
 
   @Override
-  public void addAll(Collection<M> col) {
-    queue.addAll(col);
+  public void addAll(Iterable<M> col) {
+    for (M m : col)
+      queue.add(m);
   }
 
   @Override
@@ -111,13 +113,8 @@ public final class SortedMessageQueue<M 
   }
 
   @Override
-  public MessageQueue<M> getSenderQueue() {
-    return this;
-  }
-
-  @Override
-  public MessageQueue<M> getReceiverQueue() {
-    return this;
+  public void add(BSPMessageBundle<M> bundle) {
+    addAll((POJOMessageBundle<M>) bundle);
   }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Sat Apr 20 15:10:57 2013
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.math.BigInteger;
 import java.security.SecureRandom;
-import java.util.Collection;
 import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +32,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle;
 import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
 import org.apache.hama.bsp.message.io.PreFetchCache;
 import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
@@ -44,8 +45,8 @@ import org.apache.hama.bsp.message.io.Sp
  * 
  * @param <M>
  */
-public class SpillingQueue<M extends Writable> implements MessageQueue<M>,
-    MessageTransferQueue<M> {
+public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M>
+    implements MessageTransferQueue<M> {
 
   private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
 
@@ -144,7 +145,7 @@ public class SpillingQueue<M extends Wri
   }
 
   @Override
-  public void addAll(Collection<M> msgs) {
+  public void addAll(Iterable<M> msgs) {
     for (M msg : msgs) {
       add(msg);
     }
@@ -342,13 +343,23 @@ public class SpillingQueue<M extends Wri
   }
 
   @Override
-  public MessageQueue<M> getSenderQueue() {
+  public MessageQueue<M> getSenderQueue(Configuration conf) {
     return this;
   }
 
   @Override
-  public MessageQueue<M> getReceiverQueue() {
+  public MessageQueue<M> getReceiverQueue(Configuration conf) {
     return this;
   }
 
+  @Override
+  public void add(BSPMessageBundle<M> bundle) {
+    try {
+      this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)
+          .getBuffer());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Queue transfer protocol for spilling queue.
+ *
+ * @param <M> The message type.
+ */
+public class SpillingQueueTransferProtocol<M extends Writable> implements
+    MessageTransferQueue<M> {
+
+  @Override
+  public MessageQueue<M> getSenderQueue(Configuration conf) {
+    return new SpillingQueue<M>();
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue(Configuration conf) {
+    return new SpillingQueue<M>();
+  }
+}

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat Apr 20 15:10:57 2013
@@ -33,8 +33,10 @@ import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.queue.DiskQueue;
-import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue;
+import org.apache.hama.bsp.message.queue.MemoryTransferProtocol;
 import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {
@@ -46,8 +48,8 @@ public class TestHadoopMessageManager ex
 
   public void testMemoryMessaging() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(MessageManager.QUEUE_TYPE_CLASS,
-        MemoryQueue.class.getCanonicalName());
+    conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        MemoryTransferProtocol.class, MessageTransferQueue.class);
     conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     messagingInternal(conf);
   }
@@ -55,6 +57,8 @@ public class TestHadoopMessageManager ex
   public void testDiskMessaging() throws Exception {
     Configuration conf = new Configuration();
     conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+    conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+        DiskTransferProtocolQueue.class, MessageTransferQueue.class);
     messagingInternal(conf);
   }
 

Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
+import org.apache.hama.bsp.message.bundle.WritableMessageBundle;
+
+public class TestMessageBundle extends TestCase {
+
+  public void testPOJOWritableMessageBundle() {
+
+    POJOMessageBundle<IntWritable> messageBundle = new POJOMessageBundle<IntWritable>();
+    for (int i = 0; i < 100; ++i) {
+      messageBundle.addMessage(new IntWritable(i));
+    }
+    assertEquals(100, messageBundle.getSize());
+    assertEquals(100, messageBundle.getNumElements());
+
+    int i = 0;
+    for (IntWritable writable : messageBundle) {
+      assertEquals(i++, writable.get());
+    }
+
+  }
+
+  public void testDifferentWritableMessageBundle() {
+    WritableMessageBundle<Writable> messageBundle = new WritableMessageBundle<Writable>();
+    int numElements = 5;
+
+    HashSet<Writable> set = new HashSet<Writable>();
+
+    for (int i = 0; i < numElements; ++i) {
+      Writable w = new IntWritable(i);
+      set.add(w);
+      messageBundle.addMessage(w);
+    }
+    String msg;
+    for (int i = 0; i < numElements; ++i) {
+      msg = "" + i;
+      Writable w = new Text(msg);
+      set.add(w);
+      messageBundle.addMessage(w);
+    }
+
+    assertEquals(2 * numElements, messageBundle.getSize());
+    assertEquals(2 * numElements, messageBundle.getNumElements());
+
+    for (Writable writable : messageBundle) {
+      set.remove(writable);
+    }
+    assertTrue(set.isEmpty());
+
+  }
+
+  public void testReadWriteWritableMessageBundle() throws IOException {
+    WritableMessageBundle<Writable> messageBundle = new WritableMessageBundle<Writable>();
+    int numElements = 5;
+
+    HashSet<Writable> set = new HashSet<Writable>();
+
+    for (int i = 0; i < numElements; ++i) {
+      Writable w = new IntWritable(i);
+      set.add(w);
+      messageBundle.addMessage(w);
+    }
+    String msg;
+    for (int i = 0; i < numElements; ++i) {
+      msg = "" + i;
+      Writable w = new Text(msg);
+      set.add(w);
+      messageBundle.addMessage(w);
+    }
+
+    assertEquals(2 * numElements, messageBundle.getSize());
+    assertEquals(2 * numElements, messageBundle.getNumElements());
+
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);
+    DataOutput output = new DataOutputStream(outputStream);
+    messageBundle.write(output);
+
+    ByteArrayInputStream inStream = new ByteArrayInputStream(
+        outputStream.toByteArray());
+    DataInput in = new DataInputStream(inStream);
+    WritableMessageBundle<Writable> newBundle = new WritableMessageBundle<Writable>();
+    newBundle.readFields(in);
+
+    for (Writable writable : newBundle) {
+      set.remove(writable);
+    }
+    assertTrue(set.isEmpty());
+
+  }
+
+}