You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/02/14 11:59:42 UTC

svn commit: r1446114 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/queue/

Author: surajsmenon
Date: Thu Feb 14 10:59:41 2013
New Revision: 1446114

URL: http://svn.apache.org/r1446114
Log:
[HAMA-722] Committing a minor refactor for the purpose

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 14 10:59:41 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
 
   NEW FEATURES
 
+   HAMA-722: Messaging queue should construct sender and receiver queue. (surajsmenon)
    HAMA-721: Added spilling queue with combiner. (surajsmenon)
    HAMA-559: Added spilling queue. (surajsmenon)
    HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 14 10:59:41 2013
@@ -40,6 +40,7 @@ import org.apache.hama.bsp.TaskAttemptID
 import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.queue.MemoryQueue;
 import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
 import org.apache.hama.bsp.message.queue.SingleLockQueue;
 import org.apache.hama.bsp.message.queue.SynchronizedQueue;
 import org.apache.hama.util.BSPNetUtils;
@@ -59,7 +60,9 @@ public abstract class AbstractMessageMan
   protected Configuration conf;
   protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
   protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues = new HashMap<InetSocketAddress, MessageQueue<M>>();
+
   protected MessageQueue<M> localQueue;
+
   // this must be a synchronized implementation: this is accessed per RPC
   protected SynchronizedQueue<M> localQueueForNextIteration;
   // this peer object is just used for counter incrementation
@@ -89,8 +92,8 @@ public abstract class AbstractMessageMan
     this.peer = peer;
     this.conf = conf;
     this.peerAddress = peerAddress;
-    this.localQueue = getQueue();
-    this.localQueueForNextIteration = getSynchronizedQueue();
+    this.localQueue = getSenderQueue();
+    this.localQueueForNextIteration = getSynchronizedReceiverQueue();
     this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
   }
 
@@ -158,7 +161,7 @@ public abstract class AbstractMessageMan
   public final void clearOutgoingQueues() {
     localQueue = localQueueForNextIteration.getMessageQueue();
     localQueue.prepareRead();
-    localQueueForNextIteration = getSynchronizedQueue();
+    localQueueForNextIteration = getSynchronizedReceiverQueue();
     notifyInit();
   }
 
@@ -179,7 +182,7 @@ public abstract class AbstractMessageMan
     }
     MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
     if (queue == null) {
-      queue = getQueue();
+      queue = getSenderQueue();
     }
     queue.add(msg);
     peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
@@ -204,18 +207,42 @@ public abstract class AbstractMessageMan
    * 
    * @return a <b>new</b> queue implementation.
    */
-  protected MessageQueue<M> getQueue() {
+  protected MessageQueue<M> getSenderQueue() {
+    Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
+    LOG.debug("Creating new " + queueClass);
+    @SuppressWarnings("unchecked")
+    MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
+        .newInstance(queueClass, conf);
+    MessageQueue<M> queue = newInstance.getSenderQueue();
+    queue.init(conf, attemptId);
+    return queue;
+  }
+
+  /**
+   * Returns a new queue implementation based on what was configured. If nothing
+   * has been configured for "hama.messenger.queue.class" then the
+   * {@link MemoryQueue} is used. If you have scalability issues, then better
+   * use {@link DiskQueue}.
+   * 
+   * @return a <b>new</b> queue implementation.
+   */
+  protected MessageQueue<M> getReceiverQueue() {
     Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
     LOG.debug("Creating new " + queueClass);
     @SuppressWarnings("unchecked")
-    MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils
+    MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
         .newInstance(queueClass, conf);
-    newInstance.init(conf, attemptId);
-    return newInstance;
+    MessageQueue<M> queue = newInstance.getReceiverQueue();
+    queue.init(conf, attemptId);
+    return queue;
   }
 
-  protected SynchronizedQueue<M> getSynchronizedQueue() {
-    return SingleLockQueue.synchronize(getQueue());
+  protected SynchronizedQueue<M> getSynchronizedSenderQueue() {
+    return SingleLockQueue.synchronize(getSenderQueue());
+  }
+
+  protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
+    return SingleLockQueue.synchronize(getReceiverQueue());
   }
 
   @Override
@@ -229,27 +256,27 @@ public abstract class AbstractMessageMan
   }
 
   private void notifySentMessage(String peerName, M message) {
-      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
-          aMessageListenerQueue.onMessageSent(peerName, message);
-      }
+    for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+      aMessageListenerQueue.onMessageSent(peerName, message);
+    }
   }
 
   private void notifyReceivedMessage(M message) throws IOException {
-      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
-          aMessageListenerQueue.onMessageReceived(message);
-      }
+    for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+      aMessageListenerQueue.onMessageReceived(message);
+    }
   }
 
   private void notifyInit() {
-      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
-          aMessageListenerQueue.onInitialized();
-      }
+    for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+      aMessageListenerQueue.onInitialized();
+    }
   }
 
   private void notifyClose() {
-      for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
-          aMessageListenerQueue.onClose();
-      }
+    for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+      aMessageListenerQueue.onClose();
+    }
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Thu Feb 14 10:59:41 2013
@@ -46,7 +46,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+public final class DiskQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 
@@ -310,4 +310,14 @@ public final class DiskQueue<M extends W
     return false;
   }
 
+  @Override
+  public MessageQueue<M> getSenderQueue() {
+    return this;
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue() {
+    return this;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Thu Feb 14 10:59:41 2013
@@ -29,7 +29,7 @@ import org.apache.hama.bsp.TaskAttemptID
 /**
  * LinkedList backed queue structure for bookkeeping messages.
  */
-public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
 
   private final Deque<M> deque = new ArrayDeque<M>();
   private Configuration conf;
@@ -108,4 +108,14 @@ public final class MemoryQueue<M extends
     return false;
   }
 
+  @Override
+  public MessageQueue<M> getSenderQueue() {
+    return this;
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue() {
+    return this;
+  }
+
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java?rev=1446114&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java Thu Feb 14 10:59:41 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+/**
+ * 
+ *
+ * @param <M>
+ */
+public interface MessageTransferQueue<M> {
+  
+  /**
+   * 
+   */
+  public MessageQueue<M> getSenderQueue();
+  
+  /**
+   * 
+   */
+  public MessageQueue<M> getReceiverQueue();
+
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Thu Feb 14 10:59:41 2013
@@ -150,7 +150,7 @@ public final class SingleLockQueue<T> im
    * @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
    */
   @Override
-  public Object poll() {
+  public T poll() {
     synchronized (mutex) {
       return queue.poll();
     }
@@ -190,4 +190,25 @@ public final class SingleLockQueue<T> im
       Object mutex) {
     return new SingleLockQueue<T>(queue, mutex);
   }
+
+  @Override
+  public void prepareWrite() {
+    synchronized (mutex) {
+      queue.prepareWrite();
+    }
+  }
+
+  @Override
+  public void addAll(MessageQueue<T> otherqueue) {
+    synchronized (mutex) {
+      queue.addAll(otherqueue);
+    }
+  }
+
+  @Override
+  public boolean isMessageSerialized() {
+    synchronized (mutex) {
+      return queue.isMessageSerialized();
+    }
+  }
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Thu Feb 14 10:59:41 2013
@@ -30,7 +30,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * sorted receive and send.
  */
 public final class SortedMessageQueue<M extends WritableComparable<M>>
-    implements MessageQueue<M> {
+    implements MessageQueue<M>, MessageTransferQueue<M> {
 
   private final PriorityQueue<M> queue = new PriorityQueue<M>();
   private Configuration conf;
@@ -110,4 +110,14 @@ public final class SortedMessageQueue<M 
     return false;
   }
 
+  @Override
+  public MessageQueue<M> getSenderQueue() {
+    return this;
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue() {
+    return this;
+  }
+
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Thu Feb 14 10:59:41 2013
@@ -44,7 +44,8 @@ import org.apache.hama.bsp.message.io.Sp
  * 
  * @param <M>
  */
-public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
+public class SpillingQueue<M extends Writable> implements MessageQueue<M>,
+    MessageTransferQueue<M> {
 
   private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
 
@@ -152,9 +153,9 @@ public class SpillingQueue<M extends Wri
 
   @Override
   public void addAll(MessageQueue<M> arg0) {
-      for (M anArg0 : arg0) {
-          add(anArg0);
-      }
+    for (M anArg0 : arg0) {
+      add(anArg0);
+    }
   }
 
   @Override
@@ -334,12 +335,20 @@ public class SpillingQueue<M extends Wri
   public int size() {
     return numMessagesWritten;
   }
-  
 
   @Override
   public boolean isMessageSerialized() {
     return true;
   }
 
+  @Override
+  public MessageQueue<M> getSenderQueue() {
+    return this;
+  }
+
+  @Override
+  public MessageQueue<M> getReceiverQueue() {
+    return this;
+  }
 
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Thu Feb 14 10:59:41 2013
@@ -17,36 +17,12 @@
  */
 package org.apache.hama.bsp.message.queue;
 
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * Synchronized Queue interface. Can be used to implement better synchronized
  * datastructures.
  */
-public interface SynchronizedQueue<T> extends Configurable {
-
-  public abstract Iterator<T> iterator();
-
-  public abstract void init(Configuration conf, TaskAttemptID id);
-
-  public abstract void close();
-
-  public abstract void prepareRead();
-
-  public abstract void addAll(Collection<T> col);
-
-  public abstract void add(T item);
-
-  public abstract void clear();
-
-  public abstract Object poll();
-
-  public abstract int size();
+public interface SynchronizedQueue<T> extends MessageQueue<T>{
 
   public abstract MessageQueue<T> getMessageQueue();