You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2015/02/12 03:20:00 UTC
svn commit: r1659134 - in /hama/trunk: core/src/main/java/org/apache/hama/
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/queue/
core/src/test/java/org/apache/hama/...
Author: edwardyoon
Date: Thu Feb 12 02:19:59 2015
New Revision: 1659134
URL: http://svn.apache.org/r1659134
Log:
HAMA-903: Refactor Queue interface and implementations
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestSpillingQueue.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/Constants.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/Constants.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/Constants.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/Constants.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/Constants.java Thu Feb 12 02:19:59 2015
@@ -162,7 +162,9 @@ public interface Constants {
static final String CLUSTER_IS_DISTRIBUTED = "true";
// Other constants
-
+ static final String MESSENGER_RUNTIME_COMPRESSION = "hama.messenger.runtime.compression";
+ static final String MESSENGER_COMPRESSION_THRESHOLD = "hama.messenger.compression.threshold";
+
/**
* An empty instance.
*/
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Thu Feb 12 02:19:59 2015
@@ -79,9 +79,8 @@ public class BSPMessageBundle<M extends
className = message.getClass().getName();
}
- msgBytes = serialize(message);
-
if (compressor != null) {
+ msgBytes = serialize(message);
if (msgBytes.length > threshold) {
bufferDos.writeBoolean(true);
msgBytes = compressor.compress(msgBytes);
@@ -90,7 +89,7 @@ public class BSPMessageBundle<M extends
bufferDos.writeBoolean(false);
}
}
- bufferDos.write(msgBytes);
+ message.write(bufferDos);
bundleSize++;
} catch (IOException e) {
LOG.error(e);
@@ -134,13 +133,8 @@ public class BSPMessageBundle<M extends
}
msg = ReflectionUtils.newInstance(clazz, null);
- boolean isCompressed = false;
-
- if (compressor != null) {
- isCompressed = dis.readBoolean();
- }
- if (isCompressed) {
+ if (compressor != null && dis.readBoolean()) {
int length = dis.readInt();
msgBytes = new byte[length];
dis.readFully(msgBytes);
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 12 02:19:59 2015
@@ -368,8 +368,8 @@ public final class BSPPeerImpl<K1, V1, K
InterruptedException {
// normally all messages should been send now, finalizing the send phase
- Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it = messenger
- .getOutgoingBundles();
+ Iterator<Entry<InetSocketAddress, BSPMessageBundle<M>>> it;
+ it = messenger.getOutgoingBundles();
while (it.hasNext()) {
Entry<InetSocketAddress, BSPMessageBundle<M>> entry = it.next();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Thu Feb 12 02:19:59 2015
@@ -354,9 +354,9 @@ public class LocalBSPRunner implements J
peer.incrementCounter(BSPPeerImpl.PeerCounter.MESSAGE_BYTES_TRANSFERED,
bundle.getLength());
- if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 512));
+ conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 512));
}
Iterator<M> it = bundle.iterator();
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Thu Feb 12 02:19:59 2015
@@ -28,8 +28,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
@@ -37,7 +37,6 @@ import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
@@ -108,14 +107,6 @@ public abstract class AbstractMessageMan
try {
outgoingMessageManager.clear();
localQueue.close();
- // remove possible disk queues from the path
- try {
- FileSystem.get(conf).delete(
- DiskQueue.getQueueDir(conf, attemptId,
- conf.get(DiskQueue.DISK_QUEUE_PATH_KEY)), true);
- } catch (IOException e) {
- LOG.warn("Queue dir couldn't be deleted");
- }
} finally {
notifyClose();
}
@@ -151,9 +142,6 @@ public abstract class AbstractMessageMan
if (conf.getBoolean(MessageQueue.PERSISTENT_QUEUE, false)
&& localQueue.size() > 0) {
- if (localQueue.isMemoryBasedQueue()
- && localQueueForNextIteration.isMemoryBasedQueue()) {
-
// To reduce the number of element additions
if (localQueue.size() > localQueueForNextIteration.size()) {
localQueue.addAll(localQueueForNextIteration);
@@ -162,16 +150,6 @@ public abstract class AbstractMessageMan
localQueue = localQueueForNextIteration.getMessageQueue();
}
- } else {
-
- // TODO find the way to switch disk-based queue efficiently.
- localQueueForNextIteration.addAll(localQueue);
- if (localQueue != null) {
- localQueue.close();
- }
- localQueue = localQueueForNextIteration.getMessageQueue();
-
- }
} else {
if (localQueue != null) {
localQueue.close();
@@ -180,7 +158,6 @@ public abstract class AbstractMessageMan
localQueue = localQueueForNextIteration.getMessageQueue();
}
- localQueue.prepareRead();
localQueueForNextIteration = getSynchronizedReceiverQueue();
notifyInit();
}
@@ -279,17 +256,13 @@ public abstract class AbstractMessageMan
@Override
public void loopBackBundle(BSPMessageBundle<M> bundle) throws IOException {
- if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
}
peer.incrementCounter(BSPPeerImpl.PeerCounter.TOTAL_MESSAGES_RECEIVED, bundle.size());
-
- Iterator<? extends Writable> it = bundle.iterator();
- while (it.hasNext()) {
- loopBackMessage(it.next());
- }
+ this.localQueueForNextIteration.addBundle(bundle);
}
@SuppressWarnings("unchecked")
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractOutgoingMessageManager.java Thu Feb 12 02:19:59 2015
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.message.compress.BSPMessageCompressor;
@@ -47,9 +48,9 @@ public abstract class AbstractOutgoingMe
if (!outgoingBundles.containsKey(targetPeerAddress)) {
BSPMessageBundle<M> bundle = new BSPMessageBundle<M>();
- if (conf.getBoolean("hama.messenger.runtime.compression", false)) {
+ if (conf.getBoolean(Constants.MESSENGER_RUNTIME_COMPRESSION, false)) {
bundle.setCompressor(compressor,
- conf.getLong("hama.messenger.compression.threshold", 128));
+ conf.getLong(Constants.MESSENGER_COMPRESSION_THRESHOLD, 128));
}
outgoingBundles.put(targetPeerAddress, bundle);
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Thu Feb 12 02:19:59 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
@@ -31,9 +32,19 @@ public final class MemoryQueue<M extends
SynchronizedQueue<M> {
private final ConcurrentLinkedQueue<M> deque = new ConcurrentLinkedQueue<M>();
+ private final ConcurrentLinkedQueue<BSPMessageBundle<M>> bundles = new ConcurrentLinkedQueue<BSPMessageBundle<M>>();
+ private int numOfMsg = 0;
+ private Iterator<M> currIterator;
+
private Configuration conf;
@Override
+ public void addBundle(BSPMessageBundle<M> bundle) {
+ numOfMsg += bundle.size();
+ bundles.add(bundle);
+ }
+
+ @Override
public final void addAll(Iterable<M> col) {
for (M m : col)
deque.add(m);
@@ -59,12 +70,20 @@ public final class MemoryQueue<M extends
@Override
public final M poll() {
- return deque.poll();
+ if (currIterator == null || !currIterator.hasNext()) {
+ if (bundles.size() > 0)
+ currIterator = bundles.poll().iterator();
+ else
+ return deque.poll();
+ }
+
+ numOfMsg--;
+ return currIterator.next();
}
@Override
public final int size() {
- return deque.size();
+ return numOfMsg + deque.size();
}
@Override
@@ -82,10 +101,10 @@ public final class MemoryQueue<M extends
return conf;
}
- // not doing much here
@Override
public void init(Configuration conf, TaskAttemptID id) {
-
+ this.numOfMsg = 0;
+ this.conf = conf;
}
@Override
@@ -94,27 +113,8 @@ public final class MemoryQueue<M extends
}
@Override
- public void prepareRead() {
-
- }
-
- @Override
- public void prepareWrite() {
-
- }
-
- @Override
- public boolean isMessageSerialized() {
- return false;
- }
-
- @Override
- public boolean isMemoryBasedQueue() {
- return true;
- }
-
- @Override
public MessageQueue<M> getMessageQueue() {
return this;
}
+
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Thu Feb 12 02:19:59 2015
@@ -19,12 +19,15 @@ package org.apache.hama.bsp.message.queu
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* Simple queue interface.
*/
-public interface MessageQueue<M> extends Iterable<M>, Configurable {
+public interface MessageQueue<M extends Writable> extends Iterable<M>,
+ Configurable {
public static final String PERSISTENT_QUEUE = "hama.queue.behaviour.persistent";
@@ -39,16 +42,6 @@ public interface MessageQueue<M> extends
public void close();
/**
- * Called to prepare a queue for reading.
- */
- public void prepareRead();
-
- /**
- * Called to prepare a queue for writing.
- */
- public void prepareWrite();
-
- /**
* Adds a whole Java Collection to the implementing queue.
*/
public void addAll(Iterable<M> col);
@@ -59,6 +52,13 @@ public interface MessageQueue<M> extends
public void addAll(MessageQueue<M> otherqueue);
/**
+ * Adds the received bundle
+ *
+ * @param bundle
+ */
+ public void addBundle(BSPMessageBundle<M> bundle);
+
+ /**
* Adds a single item to the implementing queue.
*/
public void add(M item);
@@ -80,12 +80,4 @@ public interface MessageQueue<M> extends
*/
public int size();
- /**
- *
- * @return true if the messages in the queue are serialized to byte buffers.
- */
- public boolean isMessageSerialized();
-
- public boolean isMemoryBasedQueue();
-
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Thu Feb 12 02:19:59 2015
@@ -20,12 +20,15 @@ package org.apache.hama.bsp.message.queu
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
/**
* A global mutex based synchronized queue.
*/
-public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+public final class SingleLockQueue<T extends Writable> implements
+ SynchronizedQueue<T> {
private final MessageQueue<T> queue;
private final Object mutex;
@@ -101,17 +104,6 @@ public final class SingleLockQueue<T> im
/*
* (non-Javadoc)
- * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
- */
- @Override
- public void prepareRead() {
- synchronized (mutex) {
- queue.prepareRead();
- }
- }
-
- /*
- * (non-Javadoc)
* @see
* org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
*/
@@ -134,6 +126,13 @@ public final class SingleLockQueue<T> im
}
}
+ @Override
+ public void addBundle(BSPMessageBundle<T> bundle) {
+ synchronized (mutex) {
+ queue.addBundle(bundle);
+ }
+ }
+
/*
* (non-Javadoc)
* @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
@@ -181,42 +180,20 @@ public final class SingleLockQueue<T> im
/*
* static constructor methods to be type safe
*/
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
- if(queue.isMemoryBasedQueue()) {
- return (SynchronizedQueue<T>) queue;
- }
-
- return new SingleLockQueue<T>(queue);
+ public static <T extends Writable> SynchronizedQueue<T> synchronize(
+ MessageQueue<T> queue) {
+ return (SynchronizedQueue<T>) queue;
}
- public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
- Object mutex) {
+ public static <T extends Writable> SynchronizedQueue<T> synchronize(
+ MessageQueue<T> queue, Object mutex) {
return new SingleLockQueue<T>(queue, mutex);
}
@Override
- public void prepareWrite() {
- synchronized (mutex) {
- queue.prepareWrite();
- }
- }
-
- @Override
public void addAll(MessageQueue<T> otherqueue) {
synchronized (mutex) {
queue.addAll(otherqueue);
}
}
-
- @Override
- public boolean isMessageSerialized() {
- synchronized (mutex) {
- return queue.isMessageSerialized();
- }
- }
-
- @Override
- public boolean isMemoryBasedQueue() {
- return true;
- }
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Thu Feb 12 02:19:59 2015
@@ -31,7 +31,7 @@ import org.apache.hama.bsp.TaskAttemptID
* sorted receive and send.
*/
public final class SortedMemoryQueue<M extends WritableComparable<M>>
- implements SynchronizedQueue<M>, BSPMessageInterface<M> {
+ implements SynchronizedQueue<M> {
private final BlockingQueue<M> queue = new PriorityBlockingQueue<M>();
private Configuration conf;
@@ -52,6 +52,11 @@ public final class SortedMemoryQueue<M e
}
@Override
+ public void addBundle(BSPMessageBundle<M> bundle) {
+ addAll(bundle);
+ }
+
+ @Override
public void addAll(Iterable<M> col) {
for (M m : col)
queue.add(m);
@@ -86,10 +91,9 @@ public final class SortedMemoryQueue<M e
}
// empty, not needed to implement
-
@Override
public void init(Configuration conf, TaskAttemptID id) {
-
+ this.conf = conf;
}
@Override
@@ -98,31 +102,6 @@ public final class SortedMemoryQueue<M e
}
@Override
- public void prepareRead() {
-
- }
-
- @Override
- public void prepareWrite() {
-
- }
-
- @Override
- public boolean isMessageSerialized() {
- return false;
- }
-
- @Override
- public void add(BSPMessageBundle<M> bundle) {
- addAll(bundle);
- }
-
- @Override
- public boolean isMemoryBasedQueue() {
- return true;
- }
-
- @Override
public MessageQueue<M> getMessageQueue() {
return this;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Thu Feb 12 02:19:59 2015
@@ -17,11 +17,13 @@
*/
package org.apache.hama.bsp.message.queue;
+import org.apache.hadoop.io.Writable;
+
/**
* Synchronized Queue interface. Can be used to implement better synchronized
* datastructures.
*/
-public interface SynchronizedQueue<T> extends MessageQueue<T> {
+public interface SynchronizedQueue<T extends Writable> extends MessageQueue<T> {
public abstract MessageQueue<T> getMessageQueue();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Thu Feb 12 02:19:59 2015
@@ -32,7 +32,6 @@ import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.message.compress.SnappyCompressor;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.examples.ClassSerializePrinting;
public class TestBSPMasterGroomServer extends HamaCluster {
@@ -55,7 +54,6 @@ public class TestBSPMasterGroomServer ex
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", "/tmp/hama-test");
- configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
configuration.set("hama.sync.client.class",
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPartitioning.java Thu Feb 12 02:19:59 2015
@@ -30,7 +30,6 @@ import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.commons.util.KeyValuePair;
@@ -54,7 +53,6 @@ public class TestPartitioning extends Ha
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", "/tmp/hama-test");
- configuration.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
configuration.set("hama.sync.client.class",
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestPersistQueue.java Thu Feb 12 02:19:59 2015
@@ -37,14 +37,6 @@ public class TestPersistQueue extends Te
public static final Log LOG = LogFactory.getLog(TestPartitioning.class);
- public void testDiskQueue() throws Exception {
- BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.DiskQueue");
-
- assertTrue(bsp.waitForCompletion(true));
- }
-
public void testMemoryQueue() throws Exception {
BSPJob bsp = getNewJobConf();
bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
@@ -60,14 +52,6 @@ public class TestPersistQueue extends Te
assertTrue(bsp.waitForCompletion(true));
}
-
- public void testSpillingQueue() throws Exception {
- BSPJob bsp = getNewJobConf();
- bsp.set(MessageManager.RECEIVE_QUEUE_TYPE_CLASS,
- "org.apache.hama.bsp.message.queue.SpillingQueue");
-
- assertTrue(bsp.waitForCompletion(true));
- }
public BSPJob getNewJobConf() throws Exception {
Configuration conf = new Configuration();
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaAsyncMessageManager.java Thu Feb 12 02:19:59 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.util.BSPNetUtils;
@@ -49,15 +48,6 @@ public class TestHamaAsyncMessageManager
HamaConfiguration conf = new HamaConfiguration();
conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
- conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
- messagingInternal(conf);
- }
-
- public void testDiskMessaging() throws Exception {
- HamaConfiguration conf = new HamaConfiguration();
- conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
- conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
- MessageQueue.class);
messagingInternal(conf);
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHamaMessageManager.java Thu Feb 12 02:19:59 2015
@@ -33,7 +33,6 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.util.BSPNetUtils;
@@ -49,15 +48,6 @@ public class TestHamaMessageManager exte
HamaConfiguration conf = new HamaConfiguration();
conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
MessageQueue.class);
- conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
- messagingInternal(conf);
- }
-
- public void testDiskMessaging() throws Exception {
- HamaConfiguration conf = new HamaConfiguration();
- conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
- conf.setClass(MessageManager.RECEIVE_QUEUE_TYPE_CLASS, DiskQueue.class,
- MessageQueue.class);
messagingInternal(conf);
}
Modified: hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/pipes/TestPipes.java Thu Feb 12 02:19:59 2015
@@ -49,7 +49,6 @@ import org.apache.hama.bsp.NullInputForm
import org.apache.hama.bsp.SequenceFileInputFormat;
import org.apache.hama.bsp.SequenceFileOutputFormat;
import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.commons.io.PipesKeyValueWritable;
import org.apache.hama.commons.io.PipesVectorWritable;
import org.apache.hama.commons.math.DenseDoubleVector;
@@ -94,8 +93,6 @@ public class TestPipes extends HamaClust
assertEquals("Make sure master addr is set to localhost:", "localhost",
configuration.get("bsp.master.address"));
configuration.set("bsp.local.dir", HAMA_TMP_OUTPUT);
- configuration
- .set(DiskQueue.DISK_QUEUE_PATH_KEY, HAMA_TMP_DISK_QUEUE_OUTPUT);
configuration.set(Constants.ZOOKEEPER_QUORUM, "localhost");
configuration.setInt(Constants.ZOOKEEPER_CLIENT_PORT, 21810);
configuration.set("hama.sync.client.class",
Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java?rev=1659134&r1=1659133&r2=1659134&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IncomingVertexMessageManager.java Thu Feb 12 02:19:59 2015
@@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentLi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
@@ -50,6 +51,11 @@ public class IncomingVertexMessageManage
}
@Override
+ public void addBundle(BSPMessageBundle<GraphJobMessage> bundle) {
+ addAll(bundle);
+ }
+
+ @Override
public void addAll(Iterable<GraphJobMessage> col) {
for (GraphJobMessage m : col)
add(m);
@@ -92,10 +98,8 @@ public class IncomingVertexMessageManage
}
// empty, not needed to implement
-
@Override
public void init(Configuration conf, TaskAttemptID id) {
-
}
@Override
@@ -104,28 +108,9 @@ public class IncomingVertexMessageManage
}
@Override
- public void prepareRead() {
-
- }
-
- @Override
- public void prepareWrite() {
-
- }
-
- @Override
- public boolean isMessageSerialized() {
- return false;
- }
-
- @Override
- public boolean isMemoryBasedQueue() {
- return true;
- }
-
- @Override
public MessageQueue<GraphJobMessage> getMessageQueue() {
return this;
}
+
}