You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/08/12 04:25:44 UTC
svn commit: r1617401 - in /hama/trunk/core/src:
main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/
main/java/org/apache/hama/bsp/message/io/
main/java/org/apache/hama/bsp/message/queue/ test/java/org/apache/hama/bsp/
test/java/org/ap...
Author: edwardyoon
Date: Tue Aug 12 02:25:44 2014
New Revision: 1617401
URL: http://svn.apache.org/r1617401
Log:
cleanup queue impls
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageIO.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Tue Aug 12 02:25:44 2014
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
@@ -356,12 +355,9 @@ public class LocalBSPRunner implements J
bundle.setCompressor(compressor,
conf.getLong("hama.messenger.compression.threshold", 512));
- Iterator<M> it = bundle.iterator();
- while (it.hasNext()) {
- MANAGER_MAP.get(addr).localQueueForNextIteration.add(it.next());
- peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
- 1L);
- }
+ MANAGER_MAP.get(addr).localQueueForNextIteration.add(bundle);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ bundle.size());
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Tue Aug 12 02:25:44 2014
@@ -232,7 +232,12 @@ public abstract class AbstractMessageMan
}
protected SynchronizedQueue<M> getSynchronizedReceiverQueue() {
- return SingleLockQueue.synchronize(getReceiverQueue());
+ MessageQueue<M> queue = getReceiverQueue();
+ if (queue.isMemoryBasedQueue()) {
+ return (SynchronizedQueue<M>) queue;
+ }
+
+ return new SingleLockQueue<M>(queue);
}
@Override
@@ -281,11 +286,9 @@ public abstract class AbstractMessageMan
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
bundle.setCompressor(compressor,
conf.getLong("hama.messenger.compression.threshold", 128));
-
- Iterator<? extends Writable> it = bundle.iterator();
- while (it.hasNext()) {
- loopBackMessage(it.next());
- }
+ this.localQueueForNextIteration.add(bundle);
+ peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED,
+ bundle.size());
}
@SuppressWarnings("unchecked")
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Tue Aug 12 02:25:44 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
@@ -45,7 +46,7 @@ import org.apache.hama.bsp.TaskAttemptID
* configuration. <br/>
* <b>It is experimental to use.</b>
*/
-public final class DiskQueue<M extends Writable> extends DefaultMessageQueue<M> {
+public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
@@ -170,7 +171,14 @@ public final class DiskQueue<M extends W
}
@Override
+ public void add(BSPMessageBundle<M> bundle){
+ addAll(bundle);
+ }
+
+ @Override
public final void addAll(Iterable<M> col) {
+ // TODO Write bundle object directly
+
for (M item : col) {
add(item);
}
@@ -313,4 +321,5 @@ public final class DiskQueue<M extends W
public boolean isMemoryBasedQueue() {
return false;
}
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Tue Aug 12 02:25:44 2014
@@ -22,54 +22,104 @@ import java.util.concurrent.ConcurrentLi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
public final class MemoryQueue<M extends Writable> implements
- SynchronizedQueue<M> {
+ SynchronizedQueue<M>, MessageQueue<M> {
+
+ private final ConcurrentLinkedQueue<M> messages = new ConcurrentLinkedQueue<M>();
+ private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new ConcurrentLinkedQueue<BSPMessageBundle<M>>();
+ private Iterator<M> bundleIterator;
+
+ int bundledMessageSize = 0;
- private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
private Configuration conf;
@Override
public final void addAll(Iterable<M> col) {
for (M m : col)
- deque.add(m);
+ messages.add(m);
}
@Override
public void addAll(MessageQueue<M> otherqueue) {
M poll = null;
while ((poll = otherqueue.poll()) != null) {
- deque.add(poll);
+ messages.add(poll);
}
}
@Override
public final void add(M item) {
- deque.add(item);
+ messages.add(item);
+ }
+
+ @Override
+ public void add(BSPMessageBundle<M> bundle) {
+ bundledMessageSize += bundle.size();
+ bundles.add(bundle);
}
@Override
public final void clear() {
- deque.clear();
+ messages.clear();
+ bundles.clear();
+ bundleIterator = null;
}
@Override
public final M poll() {
- return deque.poll();
+ if (messages.size() > 0) {
+ return messages.poll();
+ } else {
+ if (bundles.size() > 0) {
+ if (bundleIterator == null) {
+ bundleIterator = bundles.poll().iterator();
+ } else {
+ if (!bundleIterator.hasNext()) {
+ bundleIterator = bundles.poll().iterator();
+ }
+ }
+
+ bundledMessageSize--;
+ return bundleIterator.next();
+ }
+ }
+
+ return null;
}
@Override
public final int size() {
- return deque.size();
+ return messages.size() + bundledMessageSize;
}
@Override
public final Iterator<M> iterator() {
- return deque.iterator();
+ Iterator<M> it = new Iterator<M>() {
+
+ @Override
+ public boolean hasNext() {
+ if (size() > 0)
+ return true;
+ else
+ return false;
+ }
+
+ @Override
+ public M next() {
+ return poll();
+ }
+
+ @Override
+ public void remove() {
+ }
+ };
+ return it;
}
@Override
@@ -117,4 +167,5 @@ public final class MemoryQueue<M extends
public MessageQueue<M> getMessageQueue() {
return this;
}
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Tue Aug 12 02:25:44 2014
@@ -19,17 +19,23 @@ package org.apache.hama.bsp.message.queu
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* Simple queue interface.
*/
-public interface MessageQueue<M> extends Iterable<M>, Configurable {
+public interface MessageQueue<M extends Writable> extends Iterable<M>,
+ Configurable {
public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
/**
* Used to initialize the queue.
+ *
+ * @param conf
+ * @param id
*/
public void init(Configuration conf, TaskAttemptID id);
@@ -50,20 +56,33 @@ public interface MessageQueue<M> extends
/**
* Adds a whole Java Collection to the implementing queue.
+ *
+ * @param col
*/
public void addAll(Iterable<M> col);
/**
* Adds the other queue to this queue.
+ *
+ * @param otherqueue
*/
public void addAll(MessageQueue<M> otherqueue);
/**
* Adds a single item to the implementing queue.
+ *
+ * @param item
*/
public void add(M item);
/**
+ * Adds a bundle to the queue.
+ *
+ * @param bundle
+ */
+ public void add(BSPMessageBundle<M> bundle);
+
+ /**
* Clears all entries in the given queue.
*/
public void clear();
@@ -85,7 +104,10 @@ public interface MessageQueue<M> extends
* @return true if the messages in the queue are serialized to byte buffers.
*/
public boolean isMessageSerialized();
-
+
+ /**
+ * @return true if the queue is memory resident.
+ */
public boolean isMemoryBasedQueue();
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Tue Aug 12 02:25:44 2014
@@ -20,22 +20,24 @@ package org.apache.hama.bsp.message.queu
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* A global mutex based synchronized queue.
*/
-public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+public final class SingleLockQueue<T extends Writable> implements SynchronizedQueue<T> {
private final MessageQueue<T> queue;
private final Object mutex;
- private SingleLockQueue(MessageQueue<T> queue) {
+ public SingleLockQueue(MessageQueue<T> queue) {
this.queue = queue;
this.mutex = new Object();
}
- private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+ public SingleLockQueue(MessageQueue<T> queue, Object mutex) {
this.queue = queue;
this.mutex = mutex;
}
@@ -178,22 +180,6 @@ public final class SingleLockQueue<T> im
}
}
- /*
- * static constructor methods to be type safe
- */
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
- if(queue.isMemoryBasedQueue()) {
- return (SynchronizedQueue<T>) queue;
- }
-
- return new SingleLockQueue<T>(queue);
- }
-
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
- Object mutex) {
- return new SingleLockQueue<T>(queue, mutex);
- }
-
@Override
public void prepareWrite() {
synchronized (mutex) {
@@ -219,4 +205,9 @@ public final class SingleLockQueue<T> im
public boolean isMemoryBasedQueue() {
return true;
}
+
+ @Override
+ public void add(BSPMessageBundle<T> bundle) {
+ queue.add(bundle);
+ }
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Tue Aug 12 02:25:44 2014
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
* sorted receive and send.
*/
public final class SortedMemoryQueue<M extends WritableComparable<M>>
- implements SynchronizedQueue<M>, BSPMessageInterface<M> {
+ implements SynchronizedQueue<M>, MessageQueue<M> {
private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
private Configuration conf;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Tue Aug 12 02:25:44 2014
@@ -17,11 +17,13 @@
*/
package org.apache.hama.bsp.message.queue;
+import org.apache.hadoop.io.Writable;
+
/**
* Synchronized Queue interface. Can be used to implement better synchronized
* datastructures.
*/
-public interface SynchronizedQueue<T> extends MessageQueue<T> {
+public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T> {
public abstract MessageQueue<T> getMessageQueue();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Tue Aug 12 02:25:44 2014
@@ -37,14 +37,6 @@ public class TestPersistQueue extends Te
public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
- public void testDiskQueue() throws Exception {
- BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.DiskQueue");
-
- assertTrue(bsp.waitForCompletion(true));
- }
-
public void testMemoryQueue() throws Exception {
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -61,10 +53,10 @@ public class TestPersistQueue extends Te
assertTrue(bsp.waitForCompletion(true));
}
- public void testSpillingQueue() throws Exception {
+ public void testDiskQueue() throws Exception {
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.SpillingQueue");
+ "org.apache.hama.bsp.message.queue.DiskQueue");
assertTrue(bsp.waitForCompletion(true));
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1617401&r1=1617400&r2=1617401&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Tue Aug 12 02:25:44 2014
@@ -49,7 +49,6 @@ public class TestHamaMessageManager exte
HamaConfiguration conf = new HamaConfiguration();
conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
- conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
messagingInternal(conf);
}
@@ -60,7 +59,7 @@ public class TestHamaMessageManager exte
MessageQueue.class);
messagingInternal(conf);
}
-
+
private static void messagingInternal(HamaConfiguration conf)
throws Exception {
conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
@@ -102,10 +101,10 @@ public class TestHamaMessageManager exte
}
messageManager.transfer(peer, bundle);
-
+
messageManager.clearOutgoingMessages();
- assertTrue(messageManager.getNumCurrentMessages() == 1);
+ assertEquals(messageManager.getNumCurrentMessages(), 1);
IntWritable currentMessage = messageManager.getCurrentMessage();
assertEquals(currentMessage.get(), 1337);