You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/02/14 11:59:42 UTC
svn commit: r1446114 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/queue/
Author: surajsmenon
Date: Thu Feb 14 10:59:41 2013
New Revision: 1446114
URL: http://svn.apache.org/r1446114
Log:
[HAMA-722] Committing a minor refactor for the purpose
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Feb 14 10:59:41 2013
@@ -4,6 +4,7 @@ Release 0.7 (unreleased changes)
NEW FEATURES
+ HAMA-722: Messaging queue should construct sender and receiver queue. (surajsmenon)
HAMA-721: Added spilling queue with combiner. (surajsmenon)
HAMA-559: Added spilling queue. (surajsmenon)
HAMA-700: BSPPartitioner should be configurable to be optional and allow input format conversion (surajsmenon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 14 10:59:41 2013
@@ -40,6 +40,7 @@ import org.apache.hama.bsp.TaskAttemptID
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.BSPNetUtils;
@@ -59,7 +60,9 @@ public abstract class AbstractMessageMan
protected Configuration conf;
protected final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
protected final HashMap<InetSocketAddress, MessageQueue<M>> outgoingQueues = new HashMap<InetSocketAddress, MessageQueue<M>>();
+
protected MessageQueue<M> localQueue;
+
// this must be a synchronized implementation: this is accessed per RPC
protected SynchronizedQueue<M> localQueueForNextIteration;
// this peer object is just used for counter incrementation
@@ -89,8 +92,8 @@ public abstract class AbstractMessageMan
this.peer = peer;
this.conf = conf;
this.peerAddress = peerAddress;
- this.localQueue = getQueue();
- this.localQueueForNextIteration = getSynchronizedQueue();
+ this.localQueue = getSenderQueue();
+ this.localQueueForNextIteration = getSynchronizedReceiverQueue();
this.maxCachedConnections = conf.getInt(MAX_CACHED_CONNECTIONS_KEY, 100);
}
@@ -158,7 +161,7 @@ public abstract class AbstractMessageMan
public final void clearOutgoingQueues() {
localQueue = localQueueForNextIteration.getMessageQueue();
localQueue.prepareRead();
- localQueueForNextIteration = getSynchronizedQueue();
+ localQueueForNextIteration = getSynchronizedReceiverQueue();
notifyInit();
}
@@ -179,7 +182,7 @@ public abstract class AbstractMessageMan
}
MessageQueue<M> queue = outgoingQueues.get(targetPeerAddress);
if (queue == null) {
- queue = getQueue();
+ queue = getSenderQueue();
}
queue.add(msg);
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_SENT, 1L);
@@ -204,18 +207,42 @@ public abstract class AbstractMessageMan
*
* @return a <b>new</b> queue implementation.
*/
- protected MessageQueue<M> getQueue() {
+ protected MessageQueue<M> getSenderQueue() {
+ Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
+ LOG.debug("Creating new " + queueClass);
+ @SuppressWarnings("unchecked")
+ MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
+ .newInstance(queueClass, conf);
+ MessageQueue<M> queue = newInstance.getSenderQueue();
+ queue.init(conf, attemptId);
+ return queue;
+ }
+
+ /**
+ * Returns a new queue implementation based on what was configured. If nothing
+ * has been configured for "hama.messenger.queue.class" then the
+ * {@link MemoryQueue} is used. If you have scalability issues, then better
+ * use {@link DiskQueue}.
+ *
+ * @return a <b>new</b> queue implementation.
+ */
+ protected MessageQueue<M> getReceiverQueue() {
Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
LOG.debug("Creating new " + queueClass);
@SuppressWarnings("unchecked")
- MessageQueue<M> newInstance = (MessageQueue<M>) ReflectionUtils
+ MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
.newInstance(queueClass, conf);
- newInstance.init(conf, attemptId);
- return newInstance;
+ MessageQueue<M> queue = newInstance.getReceiverQueue();
+ queue.init(conf, attemptId);
+ return queue;
}
- protected SynchronizedQueue<M> getSynchronizedQueue() {
- return SingleLockQueue.synchronize(getQueue());
+ protected SynchronizedQueue<M> getSynchronizedSenderQueue() {
+ return SingleLockQueue.synchronize(getSenderQueue());
+ }
+
+ protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
+ return SingleLockQueue.synchronize(getReceiverQueue());
}
@Override
@@ -229,27 +256,27 @@ public abstract class AbstractMessageMan
}
private void notifySentMessage(String peerName, M message) {
- for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
- aMessageListenerQueue.onMessageSent(peerName, message);
- }
+ for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+ aMessageListenerQueue.onMessageSent(peerName, message);
+ }
}
private void notifyReceivedMessage(M message) throws IOException {
- for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
- aMessageListenerQueue.onMessageReceived(message);
- }
+ for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+ aMessageListenerQueue.onMessageReceived(message);
+ }
}
private void notifyInit() {
- for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
- aMessageListenerQueue.onInitialized();
- }
+ for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+ aMessageListenerQueue.onInitialized();
+ }
}
private void notifyClose() {
- for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
- aMessageListenerQueue.onClose();
- }
+ for (MessageEventListener<M> aMessageListenerQueue : this.messageListenerQueue) {
+ aMessageListenerQueue.onClose();
+ }
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Thu Feb 14 10:59:41 2013
@@ -46,7 +46,7 @@ import org.apache.hama.bsp.TaskAttemptID
* configuration. <br/>
* <b>It is experimental to use.</b>
*/
-public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+public final class DiskQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
@@ -310,4 +310,14 @@ public final class DiskQueue<M extends W
return false;
}
+ @Override
+ public MessageQueue<M> getSenderQueue() {
+ return this;
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue() {
+ return this;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Thu Feb 14 10:59:41 2013
@@ -29,7 +29,7 @@ import org.apache.hama.bsp.TaskAttemptID
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
-public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M>, MessageTransferQueue<M> {
private final Deque<M> deque = new ArrayDeque<M>();
private Configuration conf;
@@ -108,4 +108,14 @@ public final class MemoryQueue<M extends
return false;
}
+ @Override
+ public MessageQueue<M> getSenderQueue() {
+ return this;
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue() {
+ return this;
+ }
+
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java?rev=1446114&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java Thu Feb 14 10:59:41 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+/**
+ *
+ *
+ * @param <M>
+ */
+public interface MessageTransferQueue<M> {
+
+ /**
+ *
+ */
+ public MessageQueue<M> getSenderQueue();
+
+ /**
+ *
+ */
+ public MessageQueue<M> getReceiverQueue();
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Thu Feb 14 10:59:41 2013
@@ -150,7 +150,7 @@ public final class SingleLockQueue<T> im
* @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
*/
@Override
- public Object poll() {
+ public T poll() {
synchronized (mutex) {
return queue.poll();
}
@@ -190,4 +190,25 @@ public final class SingleLockQueue<T> im
Object mutex) {
return new SingleLockQueue<T>(queue, mutex);
}
+
+ @Override
+ public void prepareWrite() {
+ synchronized (mutex) {
+ queue.prepareWrite();
+ }
+ }
+
+ @Override
+ public void addAll(MessageQueue<T> otherqueue) {
+ synchronized (mutex) {
+ queue.addAll(otherqueue);
+ }
+ }
+
+ @Override
+ public boolean isMessageSerialized() {
+ synchronized (mutex) {
+ return queue.isMessageSerialized();
+ }
+ }
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Thu Feb 14 10:59:41 2013
@@ -30,7 +30,7 @@ import org.apache.hama.bsp.TaskAttemptID
* sorted receive and send.
*/
public final class SortedMessageQueue<M extends WritableComparable<M>>
- implements MessageQueue<M> {
+ implements MessageQueue<M>, MessageTransferQueue<M> {
private final PriorityQueue<M> queue = new PriorityQueue<M>();
private Configuration conf;
@@ -110,4 +110,14 @@ public final class SortedMessageQueue<M
return false;
}
+ @Override
+ public MessageQueue<M> getSenderQueue() {
+ return this;
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue() {
+ return this;
+ }
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Thu Feb 14 10:59:41 2013
@@ -44,7 +44,8 @@ import org.apache.hama.bsp.message.io.Sp
*
* @param <M>
*/
-public class SpillingQueue<M extends Writable> implements MessageQueue<M> {
+public class SpillingQueue<M extends Writable> implements MessageQueue<M>,
+ MessageTransferQueue<M> {
private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
@@ -152,9 +153,9 @@ public class SpillingQueue<M extends Wri
@Override
public void addAll(MessageQueue<M> arg0) {
- for (M anArg0 : arg0) {
- add(anArg0);
- }
+ for (M anArg0 : arg0) {
+ add(anArg0);
+ }
}
@Override
@@ -334,12 +335,20 @@ public class SpillingQueue<M extends Wri
public int size() {
return numMessagesWritten;
}
-
@Override
public boolean isMessageSerialized() {
return true;
}
+ @Override
+ public MessageQueue<M> getSenderQueue() {
+ return this;
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue() {
+ return this;
+ }
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1446114&r1=1446113&r2=1446114&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Thu Feb 14 10:59:41 2013
@@ -17,36 +17,12 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.TaskAttemptID;
/**
* Synchronized Queue interface. Can be used to implement better synchronized
* datastructures.
*/
-public interface SynchronizedQueue<T> extends Configurable {
-
- public abstract Iterator<T> iterator();
-
- public abstract void init(Configuration conf, TaskAttemptID id);
-
- public abstract void close();
-
- public abstract void prepareRead();
-
- public abstract void addAll(Collection<T> col);
-
- public abstract void add(T item);
-
- public abstract void clear();
-
- public abstract Object poll();
-
- public abstract int size();
+public interface SynchronizedQueue<T> extends MessageQueue<T>{
public abstract MessageQueue<T> getMessageQueue();