You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/08/12 04:25:44 UTC

svn commit: r1617401 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/ main/java/org/apache/hama/bsp/message/io/ main/java/org/apache/hama/bsp/message/queue/ test/java/org/apache/hama/bsp/ test/java/org/ap...

Author: edwardyoon
Date: Tue Aug 12 02:25:44 2014
New Revision: 1617401

URL: http://svn.apache.org/r1617401
Log:
cleanup queue impls

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Aug 12 02:25:44 2014
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
@@ -356,12 +355,9 @@ public class LocalBSPRunner implements J
       bundle.setCompressor(compressor,
           conf.getLong("hama.messenger.compression.threshold", 512));
 
-      Iterator<M> it = bundle.iterator();
-      while (it.hasNext()) {
-        MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
-        peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
-            1L);
-      }
+      MANAGER_MAP.get(addr).localQueueForNextIteration.add(bundle);
+      peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+          bundle.size());
     }
 
     @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Tue Aug 12 02:25:44 2014
@@ -232,7 +232,12 @@ public abstract class AbstractMessageMan
   }
 
   protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
-    return SingleLockQueue.synchronize(getReceiverQueue());
+    MessageQueue<M> queue = getReceiverQueue();
+    if (queue.isMemoryBasedQueue()) {
+      return (SynchronizedQueue<M>) queue;
+    }
+
+    return new SingleLockQueue<M>(queue);
   }
 
   @Override
@@ -281,11 +286,9 @@ public abstract class AbstractMessageMan
   public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
     bundle.setCompressor(compressor,
         conf.getLong("hama.messenger.compression.threshold", 128));
-
-    Iterator<? extends Writable> it = bundle.iterator();
-    while (it.hasNext()) {
-      loopBackMessage(it.next());
-    }
+    this.localQueueForNextIteration.add(bundle);
+    peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+        bundle.size());
   }
 
   @SuppressWarnings("unchecked")

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Tue Aug 12 02:25:44 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.ObjectWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
@@ -45,7 +46,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> extends DefaultMessageQueue<M> {
+public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 
@@ -170,7 +171,14 @@ public final class DiskQueue<M extends W
   }
 
   @Override
+  public void add(BSPMessageBundle<M> bundle){
+    addAll(bundle);
+  }
+
+  @Override
   public final void addAll(Iterable<M> col) {
+    // TODO Write bundle object directly
+
     for (M item : col) {
       add(item);
     }
@@ -313,4 +321,5 @@ public final class DiskQueue<M extends W
   public boolean isMemoryBasedQueue() {
     return false;
   }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Tue Aug 12 02:25:44 2014
@@ -22,54 +22,104 @@ import java.util.concurrent.ConcurrentLi
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
 public final class MemoryQueue<M extends Writable> implements
-    SynchronizedQueue<M> {
+    SynchronizedQueue<M>, MessageQueue<M> {
+
+  private final ConcurrentLinkedQueue<M> messages = new ConcurrentLinkedQueue<M>();
+  private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new ConcurrentLinkedQueue<BSPMessageBundle<M>>();
+  private Iterator<M> bundleIterator;
+
+  int bundledMessageSize = 0;
 
-  private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
   private Configuration conf;
 
   @Override
   public final void addAll(Iterable<M> col) {
     for (M m : col)
-      deque.add(m);
+      messages.add(m);
   }
 
   @Override
   public void addAll(MessageQueue<M> otherqueue) {
     M poll = null;
     while ((poll = otherqueue.poll()) != null) {
-      deque.add(poll);
+      messages.add(poll);
     }
   }
 
   @Override
   public final void add(M item) {
-    deque.add(item);
+    messages.add(item);
+  }
+
+  @Override
+  public void add(BSPMessageBundle<M> bundle) {
+    bundledMessageSize += bundle.size();
+    bundles.add(bundle);
   }
 
   @Override
   public final void clear() {
-    deque.clear();
+    messages.clear();
+    bundles.clear();
+    bundleIterator = null;
   }
 
   @Override
   public final M poll() {
-    return deque.poll();
+    if (messages.size() > 0) {
+      return messages.poll();
+    } else {
+      if (bundles.size() > 0) {
+        if (bundleIterator == null) {
+          bundleIterator = bundles.poll().iterator();
+        } else {
+          if (!bundleIterator.hasNext()) {
+            bundleIterator = bundles.poll().iterator();
+          }
+        }
+
+        bundledMessageSize--;
+        return bundleIterator.next();
+      }
+    }
+
+    return null;
   }
 
   @Override
   public final int size() {
-    return deque.size();
+    return messages.size() + bundledMessageSize;
   }
 
   @Override
   public final Iterator<M> iterator() {
-    return deque.iterator();
+    Iterator<M> it = new Iterator<M>() {
+
+      @Override
+      public boolean hasNext() {
+        if (size() > 0)
+          return true;
+        else
+          return false;
+      }
+
+      @Override
+      public M next() {
+        return poll();
+      }
+
+      @Override
+      public void remove() {
+      }
+    };
+    return it;
   }
 
   @Override
@@ -117,4 +167,5 @@ public final class MemoryQueue<M extends
   public MessageQueue<M> getMessageQueue() {
     return this;
   }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Tue Aug 12 02:25:44 2014
@@ -19,17 +19,23 @@ package org.apache.hama.bsp.message.queu
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Simple queue interface.
  */
-public interface MessageQueue<M> extends Iterable<M>, Configurable {
+public interface MessageQueue<M extends Writable> extends Iterable<M>,
+    Configurable {
 
   public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
 
   /**
    * Used to initialize the queue.
+   * 
+   * @param conf
+   * @param id
    */
   public void init(Configuration conf, TaskAttemptID id);
 
@@ -50,20 +56,33 @@ public interface MessageQueue<M> extends
 
   /**
    * Adds a whole Java Collection to the implementing queue.
+   * 
+   * @param col
    */
   public void addAll(Iterable<M> col);
 
   /**
    * Adds the other queue to this queue.
+   * 
+   * @param otherqueue
    */
   public void addAll(MessageQueue<M> otherqueue);
 
   /**
    * Adds a single item to the implementing queue.
+   * 
+   * @param item
    */
   public void add(M item);
 
   /**
+   * Adds a bundle to the queue.
+   * 
+   * @param bundle
+   */
+  public void add(BSPMessageBundle<M> bundle);
+
+  /**
    * Clears all entries in the given queue.
    */
   public void clear();
@@ -85,7 +104,10 @@ public interface MessageQueue<M> extends
    * @return true if the messages in the queue are serialized to byte buffers.
    */
   public boolean isMessageSerialized();
-  
+
+  /**
+   * @return true if the queue is memory resident.
+   */
   public boolean isMemoryBasedQueue();
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Tue Aug 12 02:25:44 2014
@@ -20,22 +20,24 @@ package org.apache.hama.bsp.message.queu
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * A global mutex based synchronized queue.
  */
-public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+public final class SingleLockQueue<T extends Writable> implements SynchronizedQueue<T> {
 
   private final MessageQueue<T> queue;
   private final Object mutex;
 
-  private SingleLockQueue(MessageQueue<T> queue) {
+  public SingleLockQueue(MessageQueue<T> queue) {
     this.queue = queue;
     this.mutex = new Object();
   }
 
-  private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+  public SingleLockQueue(MessageQueue<T> queue, Object mutex) {
     this.queue = queue;
     this.mutex = mutex;
   }
@@ -178,22 +180,6 @@ public final class SingleLockQueue<T> im
     }
   }
 
-  /*
-   * static constructor methods to be type safe
-   */
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
-    if(queue.isMemoryBasedQueue()) {
-      return (SynchronizedQueue<T>) queue;
-    }
-    
-    return new SingleLockQueue<T>(queue);
-  }
-
-  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
-      Object mutex) {
-    return new SingleLockQueue<T>(queue, mutex);
-  }
-
   @Override
   public void prepareWrite() {
     synchronized (mutex) {
@@ -219,4 +205,9 @@ public final class SingleLockQueue<T> im
   public boolean isMemoryBasedQueue() {
     return true;
   }
+
+  @Override
+  public void add(BSPMessageBundle<T> bundle) {
+    queue.add(bundle);
+  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Tue Aug 12 02:25:44 2014
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * sorted receive and send.
  */
 public final class SortedMemoryQueue<M extends WritableComparable<M>>
-    implements SynchronizedQueue<M>, BSPMessageInterface<M> {
+    implements SynchronizedQueue<M>, MessageQueue<M> {
 
   private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
   private Configuration conf;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Tue Aug 12 02:25:44 2014
@@ -17,11 +17,13 @@
  */
 package org.apache.hama.bsp.message.queue;
 
+import org.apache.hadoop.io.Writable;
+
 /**
  * Synchronized Queue interface. Can be used to implement better synchronized
  * datastructures.
  */
-public interface SynchronizedQueue<T> extends MessageQueue<T> {
+public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T> {
 
   public abstract MessageQueue<T> getMessageQueue();
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue Aug 12 02:25:44 2014
@@ -37,14 +37,6 @@ public class TestPersistQueue extends Te
 
   public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
 
-  public void testDiskQueue() throws Exception {
-    BSPJob bsp = getNewJobConf();
-    bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.DiskQueue");
-
-    assertTrue(bsp.waitForCompletion(true));
-  }
-
   public void testMemoryQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -61,10 +53,10 @@ public class TestPersistQueue extends Te
     assertTrue(bsp.waitForCompletion(true));
   }
 
-  public void testSpillingQueue() throws Exception {
+  public void testDiskQueue() throws Exception {
     BSPJob bsp = getNewJobConf();
     bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
-        "org.apache.hama.bsp.message.queue.SpillingQueue");
+        "org.apache.hama.bsp.message.queue.DiskQueue");
 
     assertTrue(bsp.waitForCompletion(true));
   }

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Tue Aug 12 02:25:44 2014
@@ -49,7 +49,6 @@ public class TestHamaMessageManager exte
     HamaConfiguration conf = new HamaConfiguration();
     conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
         MessageQueue.class);
-    conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
     messagingInternal(conf);
   }
 
@@ -60,7 +59,7 @@ public class TestHamaMessageManager exte
         MessageQueue.class);
     messagingInternal(conf);
   }
-
+  
   private static void messagingInternal(HamaConfiguration conf)
       throws Exception {
     conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
@@ -102,10 +101,10 @@ public class TestHamaMessageManager exte
     }
 
     messageManager.transfer(peer, bundle);
-
+    
     messageManager.clearOutgoingMessages();
 
-    assertTrue(messageManager.getNumCurrentMessages() == 1);
+    assertEquals(messageManager.getNumCurrentMessages(), 1);
     IntWritable currentMessage = messageManager.getCurrentMessage();
 
     assertEquals(currentMessage.get(), 1337);