You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/12 03:20:00 UTC

svn commit: r1659134 - in /hama/trunk: core/src/main/java/org/apache/hama/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/hama/...

Author: edwardyoon
Date: Thu Feb 12 02:19:59 2015
New Revision: 1659134

URL: http://svn.apache.org/r1659134
Log:
HAMA-903: Refactor Queue interface and implementations

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/Constants.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Feb 12 02:19:59 2015
@@ -162,7 +162,9 @@ public interface Constants {
   static final String CLUSTER_IS_DISTRIBUTED = "true";
 
   // Other constants
-
+  static final String MESSENGER_RUNTIME_COMPRESSION = "hama.messenger.runtime.compression";
+  static final String MESSENGER_COMPRESSION_THRESHOLD = "hama.messenger.compression.threshold";
+  
   /**
    * An empty instance.
    */

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 12 02:19:59 2015
@@ -79,9 +79,8 @@ public class BSPMessageBundle<M extends
         className = message.getClass().getName();
       }
 
-      msgBytes = serialize(message);
-
       if (compressor != null) {
+        msgBytes = serialize(message);
         if (msgBytes.length > threshold) {
           bufferDos.writeBoolean(true);
           msgBytes = compressor.compress(msgBytes);
@@ -90,7 +89,7 @@ public class BSPMessageBundle<M extends
           bufferDos.writeBoolean(false);
         }
       }
-      bufferDos.write(msgBytes);
+      message.write(bufferDos);
       bundleSize++;
     } catch (IOException e) {
       LOG.error(e);
@@ -134,13 +133,8 @@ public class BSPMessageBundle<M extends
           }
 
           msg = ReflectionUtils.newInstance(clazz, null);
-          boolean isCompressed = false;
-
-          if (compressor != null) {
-            isCompressed = dis.readBoolean();
-          }
 
-          if (isCompressed) {
+          if (compressor != null && dis.readBoolean()) {
             int length = dis.readInt();
             msgBytes = new byte[length];
             dis.readFully(msgBytes);

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 12 02:19:59 2015
@@ -368,8 +368,8 @@ public final class BSPPeerImpl<K1, V1, K
       InterruptedException {
 
     // normally all messages should been send now, finalizing the send phase
-    Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it = messenger
-        .getOutgoingBundles();
+    Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it;
+    it = messenger.getOutgoingBundles();
 
     while (it.hasNext()) {
       Entry<InetSocketAddress, BSPMessageBundle<M>> entry = it.next();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Feb 12 02:19:59 2015
@@ -354,9 +354,9 @@ public class LocalBSPRunner implements J
       peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
           bundle.getLength());
 
-      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
         bundle.setCompressor(compressor,
-            conf.getLong("hama.messenger.compression.threshold", 512));
+            conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 512));
       }
 
       Iterator<M> it = bundle.iterator();

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 12 02:19:59 2015
@@ -28,8 +28,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
@@ -37,7 +37,6 @@ import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SingleLockQueue;
@@ -108,14 +107,6 @@ public abstract class AbstractMessageMan
     try {
       outgoingMessageManager.clear();
       localQueue.close();
-      // remove possible disk queues from the path
-      try {
-        FileSystem.get(conf).delete(
-            DiskQueue.getQueueDir(conf, attemptId,
-                conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
-      } catch (IOException e) {
-        LOG.warn("Queue dir couldn't be deleted");
-      }
     } finally {
       notifyClose();
     }
@@ -151,9 +142,6 @@ public abstract class AbstractMessageMan
     if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
         && localQueue.size() > 0) {
 
-      if (localQueue.isMemoryBasedQueue()
-          && localQueueForNextIteration.isMemoryBasedQueue()) {
-
         // To reduce the number of element additions
         if (localQueue.size() > localQueueForNextIteration.size()) {
           localQueue.addAll(localQueueForNextIteration);
@@ -162,16 +150,6 @@ public abstract class AbstractMessageMan
           localQueue = localQueueForNextIteration.getMessageQueue();
         }
 
-      } else {
-
-        // TODO find the way to switch disk-based queue efficiently.
-        localQueueForNextIteration.addAll(localQueue);
-        if (localQueue != null) {
-          localQueue.close();
-        }
-        localQueue = localQueueForNextIteration.getMessageQueue();
-
-      }
     } else {
       if (localQueue != null) {
         localQueue.close();
@@ -180,7 +158,6 @@ public abstract class AbstractMessageMan
       localQueue = localQueueForNextIteration.getMessageQueue();
     }
 
-    localQueue.prepareRead();
     localQueueForNextIteration = getSynchronizedReceiverQueue();
     notifyInit();
   }
@@ -279,17 +256,13 @@ public abstract class AbstractMessageMan
 
   @Override
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
-    if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+    if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
       bundle.setCompressor(compressor,
-          conf.getLong("hama.messenger.compression.threshold", 128));
+          conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
     }
 
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bundle.size());
-    
-    Iterator<? extends Writable> it = bundle.iterator();
-    while (it.hasNext()) {
-      loopBackMessage(it.next());
-    }
+    this.localQueueForNextIteration.addBundle(bundle);
   }
 
   @SuppressWarnings("unchecked")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java Thu Feb 12 02:19:59 2015
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
@@ -47,9 +48,9 @@ public abstract class AbstractOutgoingMe
 
     if (!outgoingBundles.containsKey(targetPeerAddress)) {
       BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
-      if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+      if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
         bundle.setCompressor(compressor,
-            conf.getLong("hama.messenger.compression.threshold", 128));
+            conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
       }
       outgoingBundles.put(targetPeerAddress, bundle);
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Thu Feb 12 02:19:59 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
@@ -31,9 +32,19 @@ public final class MemoryQueue<M extends
     SynchronizedQueue<M> {
 
   private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
+  private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new ConcurrentLinkedQueue<BSPMessageBundle<M>>();
+  private int numOfMsg = 0;
+  private Iterator<M> currIterator;
+
   private Configuration conf;
 
   @Override
+  public void addBundle(BSPMessageBundle<M> bundle) {
+    numOfMsg += bundle.size();
+    bundles.add(bundle);
+  }
+
+  @Override
   public final void addAll(Iterable<M> col) {
     for (M m : col)
       deque.add(m);
@@ -59,12 +70,20 @@ public final class MemoryQueue<M extends
 
   @Override
   public final M poll() {
-    return deque.poll();
+    if (currIterator == null || !currIterator.hasNext()) {
+      if (bundles.size() > 0)
+        currIterator = bundles.poll().iterator();
+      else
+        return deque.poll();
+    }
+
+    numOfMsg--;
+    return currIterator.next();
   }
 
   @Override
   public final int size() {
-    return deque.size();
+    return numOfMsg + deque.size();
   }
 
   @Override
@@ -82,10 +101,10 @@ public final class MemoryQueue<M extends
     return conf;
   }
 
-  // not doing much here
   @Override
   public void init(Configuration conf, TaskAttemptID id) {
-
+    this.numOfMsg = 0;
+    this.conf = conf;
   }
 
   @Override
@@ -94,27 +113,8 @@ public final class MemoryQueue<M extends
   }
 
   @Override
-  public void prepareRead() {
-
-  }
-
-  @Override
-  public void prepareWrite() {
-
-  }
-
-  @Override
-  public boolean isMessageSerialized() {
-    return false;
-  }
-
-  @Override
-  public boolean isMemoryBasedQueue() {
-    return true;
-  }
-
-  @Override
   public MessageQueue<M> getMessageQueue() {
     return this;
   }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Thu Feb 12 02:19:59 2015
@@ -19,12 +19,15 @@ package org.apache.hama.bsp.message.queu
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Simple queue interface.
  */
-public interface MessageQueue<M> extends Iterable<M>, Configurable {
+public interface MessageQueue<M extends Writable> extends Iterable<M>,
+    Configurable {
 
   public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
 
@@ -39,16 +42,6 @@ public interface MessageQueue<M> extends
   public void close();
 
   /**
-   * Called to prepare a queue for reading.
-   */
-  public void prepareRead();
-
-  /**
-   * Called to prepare a queue for writing.
-   */
-  public void prepareWrite();
-
-  /**
    * Adds a whole Java Collection to the implementing queue.
    */
   public void addAll(Iterable<M> col);
@@ -59,6 +52,13 @@ public interface MessageQueue<M> extends
   public void addAll(MessageQueue<M> otherqueue);
 
   /**
+   * Adds the received bundle
+   * 
+   * @param bundle
+   */
+  public void addBundle(BSPMessageBundle<M> bundle);
+
+  /**
    * Adds a single item to the implementing queue.
    */
   public void add(M item);
@@ -80,12 +80,4 @@ public interface MessageQueue<M> extends
    */
   public int size();
 
-  /**
-   * 
-   * @return true if the messages in the queue are serialized to byte buffers.
-   */
-  public boolean isMessageSerialized();
-  
-  public boolean isMemoryBasedQueue();
-
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Thu Feb 12 02:19:59 2015
@@ -20,12 +20,15 @@ package org.apache.hama.bsp.message.queu
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * A global mutex based synchronized queue.
  */
-public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+public final class SingleLockQueue<T extends Writable> implements
+    SynchronizedQueue<T> {
 
   private final MessageQueue<T> queue;
   private final Object mutex;
@@ -101,17 +104,6 @@ public final class SingleLockQueue<T> im
 
   /*
    * (non-Javadoc)
-   * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
-   */
-  @Override
-  public void prepareRead() {
-    synchronized (mutex) {
-      queue.prepareRead();
-    }
-  }
-
-  /*
-   * (non-Javadoc)
    * @see
    * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
    */
@@ -134,6 +126,13 @@ public final class SingleLockQueue<T> im
     }
   }
 
+  @Override
+  public void addBundle(BSPMessageBundle<T> bundle) {
+    synchronized (mutex) {
+      queue.addBundle(bundle);
+    }
+  }
+
   /*
    * (non-Javadoc)
    * @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
@@ -181,42 +180,20 @@ public final class SingleLockQueue<T> im
   /*
    * static constructor methods to be type safe
    */
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
-    if(queue.isMemoryBasedQueue()) {
-      return (SynchronizedQueue<T>) queue;
-    }
-    
-    return new SingleLockQueue<T>(queue);
+  public static <T extends Writable> SynchronizedQueue<T> synchronize(
+      MessageQueue<T> queue) {
+    return (SynchronizedQueue<T>) queue;
   }
 
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
-      Object mutex) {
+  public static <T extends Writable> SynchronizedQueue<T> synchronize(
+      MessageQueue<T> queue, Object mutex) {
     return new SingleLockQueue<T>(queue, mutex);
   }
 
   @Override
-  public void prepareWrite() {
-    synchronized (mutex) {
-      queue.prepareWrite();
-    }
-  }
-
-  @Override
   public void addAll(MessageQueue<T> otherqueue) {
     synchronized (mutex) {
       queue.addAll(otherqueue);
     }
   }
-
-  @Override
-  public boolean isMessageSerialized() {
-    synchronized (mutex) {
-      return queue.isMessageSerialized();
-    }
-  }
-
-  @Override
-  public boolean isMemoryBasedQueue() {
-    return true;
-  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Thu Feb 12 02:19:59 2015
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * sorted receive and send.
  */
 public final class SortedMemoryQueue<M extends WritableComparable<M>>
-    implements SynchronizedQueue<M>, BSPMessageInterface<M> {
+    implements SynchronizedQueue<M> {
 
   private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
   private Configuration conf;
@@ -52,6 +52,11 @@ public final class SortedMemoryQueue<M e
   }
 
   @Override
+  public void addBundle(BSPMessageBundle<M> bundle) {
+    addAll(bundle);
+  }
+
+  @Override
   public void addAll(Iterable<M> col) {
     for (M m : col)
       queue.add(m);
@@ -86,10 +91,9 @@ public final class SortedMemoryQueue<M e
   }
 
   // empty, not needed to implement
-
   @Override
   public void init(Configuration conf, TaskAttemptID id) {
-
+    this.conf = conf;
   }
 
   @Override
@@ -98,31 +102,6 @@ public final class SortedMemoryQueue<M e
   }
 
   @Override
-  public void prepareRead() {
-
-  }
-
-  @Override
-  public void prepareWrite() {
-
-  }
-
-  @Override
-  public boolean isMessageSerialized() {
-    return false;
-  }
-
-  @Override
-  public void add(BSPMessageBundle<M> bundle) {
-    addAll(bundle);
-  }
-
-  @Override
-  public boolean isMemoryBasedQueue() {
-    return true;
-  }
-
-  @Override
   public MessageQueue<M> getMessageQueue() {
     return this;
   }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Thu Feb 12 02:19:59 2015
@@ -17,11 +17,13 @@
  */
 package org.apache.hama.bsp.message.queue;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * Synchronized Queue interface. Can be used to implement better synchronized
  * datastructures.
  */
-public interface SynchronizedQueue<T> extends MessageQueue<T> {
+public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T> {
 
   public abstract MessageQueue<T> getMessageQueue();
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Feb 12 02:19:59 2015
@@ -32,7 +32,6 @@ import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.message.compress.SnappyCompressor;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.examples.ClassSerializePrinting;
 
 public class TestBSPMasterGroomServer extends HamaCluster {
@@ -55,7 +54,6 @@ public class TestBSPMasterGroomServer ex
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");
-    configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Thu Feb 12 02:19:59 2015
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.commons.util.KeyValuePair;
 
@@ -54,7 +53,6 @@ public class TestPartitioning extends Ha
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", "/tmp/hama-test");
-    configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Thu Feb 12 02:19:59 2015
@@ -37,14 +37,6 @@ public class TestPersistQueue extends Te
 
   public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
 
-  public void testDiskQueue() throws Exception {
-    BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.DiskQueue");
-
-    assertTrue(bsp.waitForCompletion(true));
-  }
-
   public void testMemoryQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -60,14 +52,6 @@ public class TestPersistQueue extends Te
 
     assertTrue(bsp.waitForCompletion(true));
   }
-
-  public void testSpillingQueue() throws Exception {
-    BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.SpillingQueue");
-
-    assertTrue(bsp.waitForCompletion(true));
-  }
 
   public BSPJob getNewJobConf() throws Exception {
     Configuration conf = new Configuration();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Thu Feb 12 02:19:59 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.util.BSPNetUtils;
@@ -49,15 +48,6 @@ public class TestHamaAsyncMessageManager
     HamaConfiguration conf = new HamaConfiguration();
     conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
         MessageQueue.class);
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    messagingInternal(conf);
-  }
-
-  public void testDiskMessaging() throws Exception {
-    HamaConfiguration conf = new HamaConfiguration();
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
-        MessageQueue.class);
     messagingInternal(conf);
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Thu Feb 12 02:19:59 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.util.BSPNetUtils;
@@ -49,15 +48,6 @@ public class TestHamaMessageManager exte
     HamaConfiguration conf = new HamaConfiguration();
     conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
         MessageQueue.class);
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    messagingInternal(conf);
-  }
-
-  public void testDiskMessaging() throws Exception {
-    HamaConfiguration conf = new HamaConfiguration();
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
-    conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
-        MessageQueue.class);
     messagingInternal(conf);
   }
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Feb 12 02:19:59 2015
@@ -49,7 +49,6 @@ import org.apache.hama.bsp.NullInputForm
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.commons.io.PipesKeyValueWritable;
 import org.apache.hama.commons.io.PipesVectorWritable;
 import org.apache.hama.commons.math.DenseDoubleVector;
@@ -94,8 +93,6 @@ public class TestPipes extends HamaClust
     assertEquals("Make sure master addr is set to localhost:", "localhost",
         configuration.get("bsp.master.address"));
     configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT);
-    configuration
-        .set(DiskQueue.DISK_QUEUE_PATH_KEY, HAMA_TMP_DISK_QUEUE_OUTPUT);
     configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
     configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
     configuration.set("hama.sync.client.class",

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Thu Feb 12 02:19:59 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
@@ -50,6 +51,11 @@ public class IncomingVertexMessageManage
   }
 
   @Override
+  public void addBundle(BSPMessageBundle<GraphJobMessage> bundle) {
+    addAll(bundle);
+  }
+  
+  @Override
   public void addAll(Iterable<GraphJobMessage> col) {
     for (GraphJobMessage m : col)
       add(m);
@@ -92,10 +98,8 @@ public class IncomingVertexMessageManage
   }
 
   // empty, not needed to implement
-
   @Override
   public void init(Configuration conf, TaskAttemptID id) {
-
   }
 
   @Override
@@ -104,28 +108,9 @@ public class IncomingVertexMessageManage
   }
 
   @Override
-  public void prepareRead() {
-
-  }
-
-  @Override
-  public void prepareWrite() {
-
-  }
-
-  @Override
-  public boolean isMessageSerialized() {
-    return false;
-  }
-
-  @Override
-  public boolean isMemoryBasedQueue() {
-    return true;
-  }
-
-  @Override
   public MessageQueue<GraphJobMessage> getMessageQueue() {
     return this;
   }
 
+
 }