You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by su...@apache.org on 2013/04/20 17:10:58 UTC
svn commit: r1470171 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/bundle/
core/src/main/java/org/apache/hama/bsp/message/io/
core/src/main/java/org/apache/hama/bsp/message/queue/ co...
Author: surajsmenon
Date: Sat Apr 20 15:10:57 2013
New Revision: 1470171
URL: http://svn.apache.org/r1470171
Log:
Fixes for [HAMA-707] and [HAMA-722]
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Sat Apr 20 15:10:57 2013
@@ -8,7 +8,11 @@ Release 0.7 (unreleased changes)
BUG FIXES
+
IMPROVEMENTS
+
+ HAMA-707: BSPMessageBundle should be able to encapsulate messages serialized in ByteBuffer (surajsmenon)
+ HAMA-722: Messaging queue should construct sender and receiver queue.(surajsmenon)
Release 0.6.1 - April 01, 2013
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Sat Apr 20 15:10:57 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.conf.Configurab
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
@@ -40,7 +39,6 @@ import org.apache.hama.bsp.TaskAttemptID
import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.queue.MemoryQueue;
import org.apache.hama.bsp.message.queue.MessageQueue;
-import org.apache.hama.bsp.message.queue.MessageTransferQueue;
import org.apache.hama.bsp.message.queue.SingleLockQueue;
import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.BSPNetUtils;
@@ -208,12 +206,9 @@ public abstract class AbstractMessageMan
* @return a <b>new</b> queue implementation.
*/
protected MessageQueue<M> getSenderQueue() {
- Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
- LOG.debug("Creating new " + queueClass);
@SuppressWarnings("unchecked")
- MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
- .newInstance(queueClass, conf);
- MessageQueue<M> queue = newInstance.getSenderQueue();
+ MessageQueue<M> queue = MessageTransferQueueFactory
+ .getMessageTransferQueue(conf).getSenderQueue(conf);
queue.init(conf, attemptId);
return queue;
}
@@ -227,12 +222,9 @@ public abstract class AbstractMessageMan
* @return a <b>new</b> queue implementation.
*/
protected MessageQueue<M> getReceiverQueue() {
- Class<?> queueClass = conf.getClass(QUEUE_TYPE_CLASS, MemoryQueue.class);
- LOG.debug("Creating new " + queueClass);
@SuppressWarnings("unchecked")
- MessageTransferQueue<M> newInstance = (MessageTransferQueue<M>) ReflectionUtils
- .newInstance(queueClass, conf);
- MessageQueue<M> queue = newInstance.getReceiverQueue();
+ MessageQueue<M> queue = MessageTransferQueueFactory
+ .getMessageTransferQueue(conf).getReceiverQueue(conf);
queue.init(conf, attemptId);
return queue;
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Sat Apr 20 15:10:57 2013
@@ -36,7 +36,11 @@ import org.apache.hama.bsp.message.queue
*/
public interface MessageManager<M extends Writable> {
+ @Deprecated
public static final String QUEUE_TYPE_CLASS = "hama.messenger.queue.class";
+ public static final String RECEIVE_QUEUE_TYPE_CLASS = "hama.messenger.receive.queue.class";
+ public static final String SENDER_QUEUE_TYPE_CLASS = "hama.messenger.sender.queue.class";
+ public static final String TRANSFER_QUEUE_TYPE_CLASS = "hama.messenger.xfer.queue.class";
public static final String MAX_CACHED_CONNECTIONS_KEY = "hama.messenger.max.cached.connections";
/**
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageTransferQueueFactory.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
+import org.apache.hama.util.ReflectionUtils;
+
+/**
+ * Factory class to define protocols between the sender and receiver queues.
+ *
+ * @param <M> The message type.
+ */
+public class MessageTransferQueueFactory<M> {
+
+ private static final Log LOG = LogFactory.getLog(MessageTransferQueueFactory.class);
+
+ private static class BackwardCompatibleTransferQueue<M> implements
+ MessageTransferQueue<M> {
+
+ @Override
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
+ return getMessageQueue(conf);
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
+ return getMessageQueue(conf);
+ }
+
+ @SuppressWarnings({ "unchecked", "deprecation" })
+ private MessageQueue<M> getMessageQueue(Configuration conf) {
+ return ReflectionUtils.newInstance(conf.getClass(
+ MessageManager.QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class));
+ }
+
+ }
+
+ private static class DefaultMessageTransferQueue<M> implements
+ MessageTransferQueue<M> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
+ return ReflectionUtils.newInstance(conf.getClass(
+ MessageManager.SENDER_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
+ return ReflectionUtils.newInstance(conf.getClass(
+ MessageManager.RECEIVE_QUEUE_TYPE_CLASS, MemoryQueue.class,
+ MessageQueue.class));
+ }
+
+ }
+
+ @SuppressWarnings({ "rawtypes", "deprecation" })
+ public static MessageTransferQueue getMessageTransferQueue(Configuration conf) {
+
+ if (conf.getClass(MessageManager.QUEUE_TYPE_CLASS, null) != null) {
+ LOG.warn("Message queue is configured on deprecated parameter:"
+ + MessageManager.QUEUE_TYPE_CLASS);
+ return new BackwardCompatibleTransferQueue();
+ }
+ return (MessageTransferQueue) ReflectionUtils.newInstance(conf.getClass(
+ MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ DefaultMessageTransferQueue.class, MessageTransferQueue.class));
+
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/BSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSPMessageBundle stores a group of BSPMessages so that they can be sent in
+ * batch rather than individually.
+ *
+ */
+public interface BSPMessageBundle<M extends Writable> {
+
+ /**
+ * Returns the size of the message.
+ *
+ * @return Size of serialized message bundle. -1 if the size is not known.
+ */
+ public long getSize();
+
+ /**
+ * Returns the number of elements.
+ *
+ * @return Number of elements. -1 if the number of elements is not known.
+ */
+ public int getNumElements();
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/ByteBufferBSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSP Message Bundle that encapsulates a ByteBuffer.
+ *
+ * @param <M> Message type.
+ */
+public class ByteBufferBSPMessageBundle<M extends Writable> implements
+ BSPMessageBundle<M> {
+
+ private ByteBuffer[] byteArr;
+ private int count;
+
+ public ByteBufferBSPMessageBundle(ByteBuffer[] buffer, int count) {
+ byteArr = buffer;
+ this.count = count;
+ }
+
+ public ByteBufferBSPMessageBundle(ByteBuffer[] buffer) {
+ this(buffer, -1);
+ }
+
+ public ByteBuffer[] getBuffers() {
+ return byteArr;
+ }
+
+ @Override
+ public long getSize() {
+ return byteArr.length;
+ }
+
+ @Override
+ public int getNumElements() {
+ return count;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/HeapByteArrayBSPMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSP Message Bundle that stores the messages as heap byte arrays.
+ *
+ * @param <M> Message type.
+ */
+public class HeapByteArrayBSPMessageBundle<M extends Writable> implements
+ BSPMessageBundle<M> {
+
+ byte[] byteArr;
+ int count;
+
+ public HeapByteArrayBSPMessageBundle(byte[] buffer) {
+ this(buffer, -1);
+ }
+
+ public HeapByteArrayBSPMessageBundle(byte[] buffer, int count) {
+ byteArr = buffer;
+ this.count = count;
+ }
+
+ public byte[] getBuffer() {
+ return byteArr;
+ }
+
+ @Override
+ public long getSize() {
+ return byteArr.length;
+ }
+
+ @Override
+ public int getNumElements() {
+ return byteArr.length;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/POJOMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+
+public class POJOMessageBundle<M extends Writable> implements
+ BSPMessageBundle<M>, Iterable<M> {
+
+ protected static final Log LOG = LogFactory.getLog(POJOMessageBundle.class);
+
+ protected HashMap<String, List<M>> messages = new HashMap<String, List<M>>();
+ protected HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
+
+ protected int numElements;
+
+ private static class BundleIterator<M extends Writable> implements
+ Iterator<M> {
+
+ private Iterator<List<M>> listIterator;
+ private Iterator<M> messageIterator;
+
+ public BundleIterator(Iterator<List<M>> listIterator) {
+ this.listIterator = listIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return listIterator.hasNext() || messageIterator.hasNext();
+ }
+
+ @Override
+ public M next() {
+ while (true) {
+ if (messageIterator != null && messageIterator.hasNext()) {
+ return messageIterator.next();
+ } else {
+ if (listIterator.hasNext()) {
+ messageIterator = listIterator.next().iterator();
+ } else {
+ return null;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void remove() {
+ }
+
+ }
+
+ public POJOMessageBundle() {
+ }
+
+ /**
+ * Add message to this bundle.
+ *
+ * @param message BSPMessage to add.
+ */
+ public void addMessage(M message) {
+ String className = message.getClass().getName();
+ List<M> list = messages.get(className);
+ ++numElements;
+ if (list == null) {
+ list = new ArrayList<M>();
+ messages.put(className, list);
+ }
+
+ list.add(message);
+ }
+
+ public List<M> getMessages() {
+ // here we use an arraylist, because we know the size and outside may need
+ // random access
+ List<M> mergeList = new ArrayList<M>(messages.size());
+ for (List<M> c : messages.values()) {
+ mergeList.addAll(c);
+ }
+ return mergeList;
+ }
+
+ @Override
+ public Iterator<M> iterator() {
+ return new BundleIterator<M>(this.messages.values().iterator());
+ }
+
+ @Override
+ public long getSize() {
+ return numElements;
+ }
+
+ @Override
+ public int getNumElements() {
+ return numElements;
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/WritableMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.bundle;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+public class WritableMessageBundle<M extends Writable> extends
+ POJOMessageBundle<M> implements Writable {
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ // writes the k/v mapping size
+ out.writeInt(messages.size());
+ if (messages.size() > 0) {
+ for (Entry<String, List<M>> entry : messages.entrySet()) {
+ out.writeUTF(entry.getKey());
+ List<M> messageList = entry.getValue();
+ out.writeInt(messageList.size());
+ for (M msg : messageList) {
+ msg.write(out);
+ }
+ }
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void readFields(DataInput in) throws IOException {
+ if (messages == null) {
+ messages = new HashMap<String, List<M>>();
+ }
+ int numMessages = in.readInt();
+ if (numMessages > 0) {
+ for (int entries = 0; entries < numMessages; entries++) {
+ String className = in.readUTF();
+ int size = in.readInt();
+ List<M> msgList = new ArrayList<M>(size);
+ messages.put(className, msgList);
+
+ Class<M> clazz = null;
+ if ((clazz = classCache.get(className)) == null) {
+ try {
+ clazz = (Class<M>) Class.forName(className);
+ classCache.put(className, clazz);
+ } catch (ClassNotFoundException e) {
+ LOG.error("Class was not found.", e);
+ }
+ }
+
+ for (int i = 0; i < size; i++) {
+ M msg = ReflectionUtils.newInstance(clazz, null);
+ msg.readFields(in);
+ msgList.add(msg);
+ }
+
+ }
+ }
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/io/SpillingDataOutputBuffer.java Sat Apr 20 15:10:57 2013
@@ -441,6 +441,7 @@ public class SpillingDataOutputBuffer ex
}
}
+ closed_ = true;
}
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+
+public interface BSPMessageInterface<M extends Writable> {
+
+ public void add(BSPMessageBundle<M> bundle);
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+
+public abstract class ByteArrayMessageQueue<M extends Writable> implements
+ BSPMessageInterface<M>, MessageQueue<M> {
+
+ @Override
+ public void add(BSPMessageBundle<M> bundle) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Sat Apr 20 15:10:57 2013
@@ -18,7 +18,6 @@
package org.apache.hama.bsp.message.queue;
import java.io.IOException;
-import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.logging.Log;
@@ -46,8 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
* configuration. <br/>
* <b>It is experimental to use.</b>
*/
-public final class DiskQueue<M extends Writable> implements MessageQueue<M>,
- MessageTransferQueue<M> {
+public final class DiskQueue<M extends Writable> extends POJOMessageQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
@@ -172,7 +170,7 @@ public final class DiskQueue<M extends W
}
@Override
- public final void addAll(Collection<M> col) {
+ public final void addAll(Iterable<M> col) {
for (M item : col) {
add(item);
}
@@ -310,15 +308,4 @@ public final class DiskQueue<M extends W
public boolean isMessageSerialized() {
return false;
}
-
- @Override
- public MessageQueue<M> getSenderQueue() {
- return this;
- }
-
- @Override
- public MessageQueue<M> getReceiverQueue() {
- return this;
- }
-
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskTransferProtocolQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The disk transfer queue protocol.
+ *
+ * @param <M>
+ */
+public class DiskTransferProtocolQueue<M extends Writable> implements
+ MessageTransferQueue<M> {
+
+ @Override
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
+ return new DiskQueue<M>();
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
+ return new DiskQueue<M>();
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Sat Apr 20 15:10:57 2013
@@ -18,7 +18,6 @@
package org.apache.hama.bsp.message.queue;
import java.util.ArrayDeque;
-import java.util.Collection;
import java.util.Deque;
import java.util.Iterator;
@@ -29,15 +28,15 @@ import org.apache.hama.bsp.TaskAttemptID
/**
* LinkedList backed queue structure for bookkeeping messages.
*/
-public final class MemoryQueue<M extends Writable> implements MessageQueue<M>,
- MessageTransferQueue<M> {
+public final class MemoryQueue<M extends Writable> extends POJOMessageQueue<M> {
private final Deque<M> deque = new ArrayDeque<M>();
private Configuration conf;
@Override
- public final void addAll(Collection<M> col) {
- deque.addAll(col);
+ public final void addAll(Iterable<M> col) {
+ for (M m : col)
+ deque.add(m);
}
@Override
@@ -108,15 +107,4 @@ public final class MemoryQueue<M extends
public boolean isMessageSerialized() {
return false;
}
-
- @Override
- public MessageQueue<M> getSenderQueue() {
- return this;
- }
-
- @Override
- public MessageQueue<M> getReceiverQueue() {
- return this;
- }
-
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryTransferProtocol.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Queue transfer protocol for memory queue.
+ *
+ * @param <M>
+ */
+public class MemoryTransferProtocol<M extends Writable> implements
+ MessageTransferQueue<M> {
+
+ @Override
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
+ return new MemoryQueue<M>();
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
+ return new MemoryQueue<M>();
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Sat Apr 20 15:10:57 2013
@@ -17,8 +17,6 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Collection;
-
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.bsp.TaskAttemptID;
@@ -51,7 +49,7 @@ public interface MessageQueue<M> extends
/**
* Adds a whole Java Collection to the implementing queue.
*/
- public void addAll(Collection<M> col);
+ public void addAll(Iterable<M> col);
/**
* Adds the other queue to this queue.
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageTransferQueue.java Sat Apr 20 15:10:57 2013
@@ -17,21 +17,23 @@
*/
package org.apache.hama.bsp.message.queue;
+import org.apache.hadoop.conf.Configuration;
+
/**
- *
+ * Interface to define the sender queue and receiver queue protocol.
*
* @param <M>
*/
public interface MessageTransferQueue<M> {
-
+
/**
- *
+ * Instantiate a sender queue.
*/
- public MessageQueue<M> getSenderQueue();
+ public MessageQueue<M> getSenderQueue(Configuration conf);
/**
- *
+ * Instantiate a receiver queue.
*/
- public MessageQueue<M> getReceiverQueue();
+ public MessageQueue<M> getReceiverQueue(Configuration conf);
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
+
+/**
+ * Java object message queue.
+ *
+ * @param <M> Message type.
+ */
+public abstract class POJOMessageQueue<M extends Writable> implements
+ BSPMessageInterface<M>, Iterable<M>, MessageQueue<M> {
+
+ @Override
+ public void add(BSPMessageBundle<M> bundle){
+ this.addAll((POJOMessageBundle<M>)bundle);
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Sat Apr 20 15:10:57 2013
@@ -17,7 +17,6 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Collection;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
@@ -117,9 +116,10 @@ public final class SingleLockQueue<T> im
* org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
*/
@Override
- public void addAll(Collection<T> col) {
+ public void addAll(Iterable<T> col) {
synchronized (mutex) {
- queue.addAll(col);
+ for (T m : col)
+ queue.add(m);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Sat Apr 20 15:10:57 2013
@@ -17,20 +17,21 @@
*/
package org.apache.hama.bsp.message.queue;
-import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
/**
* Heap (Java's priority queue) based message queue implementation that supports
* sorted receive and send.
*/
public final class SortedMessageQueue<M extends WritableComparable<M>>
- implements MessageQueue<M>, MessageTransferQueue<M> {
+ implements MessageQueue<M>, BSPMessageInterface<M> {
private final PriorityQueue<M> queue = new PriorityQueue<M>();
private Configuration conf;
@@ -51,8 +52,9 @@ public final class SortedMessageQueue<M
}
@Override
- public void addAll(Collection<M> col) {
- queue.addAll(col);
+ public void addAll(Iterable<M> col) {
+ for (M m : col)
+ queue.add(m);
}
@Override
@@ -111,13 +113,8 @@ public final class SortedMessageQueue<M
}
@Override
- public MessageQueue<M> getSenderQueue() {
- return this;
- }
-
- @Override
- public MessageQueue<M> getReceiverQueue() {
- return this;
+ public void add(BSPMessageBundle<M> bundle) {
+ addAll((POJOMessageBundle<M>) bundle);
}
}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Sat Apr 20 15:10:57 2013
@@ -22,7 +22,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.math.BigInteger;
import java.security.SecureRandom;
-import java.util.Collection;
import java.util.Iterator;
import org.apache.commons.logging.Log;
@@ -33,6 +32,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle;
import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.PreFetchCache;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
@@ -44,8 +45,8 @@ import org.apache.hama.bsp.message.io.Sp
*
* @param <M>
*/
-public class SpillingQueue<M extends Writable> implements MessageQueue<M>,
- MessageTransferQueue<M> {
+public class SpillingQueue<M extends Writable> extends ByteArrayMessageQueue<M>
+ implements MessageTransferQueue<M> {
private static final Log LOG = LogFactory.getLog(SpillingQueue.class);
@@ -144,7 +145,7 @@ public class SpillingQueue<M extends Wri
}
@Override
- public void addAll(Collection<M> msgs) {
+ public void addAll(Iterable<M> msgs) {
for (M msg : msgs) {
add(msg);
}
@@ -342,13 +343,23 @@ public class SpillingQueue<M extends Wri
}
@Override
- public MessageQueue<M> getSenderQueue() {
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
return this;
}
@Override
- public MessageQueue<M> getReceiverQueue() {
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
return this;
}
+ @Override
+ public void add(BSPMessageBundle<M> bundle) {
+ try {
+ this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)
+ .getBuffer());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueueTransferProtocol.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Queue transfer protocol for spilling queue.
+ *
+ * @param <M> The message type.
+ */
+public class SpillingQueueTransferProtocol<M extends Writable> implements
+ MessageTransferQueue<M> {
+
+ @Override
+ public MessageQueue<M> getSenderQueue(Configuration conf) {
+ return new SpillingQueue<M>();
+ }
+
+ @Override
+ public MessageQueue<M> getReceiverQueue(Configuration conf) {
+ return new SpillingQueue<M>();
+ }
+}
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1470171&r1=1470170&r2=1470171&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Sat Apr 20 15:10:57 2013
@@ -33,8 +33,10 @@ import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.queue.DiskQueue;
-import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.DiskTransferProtocolQueue;
+import org.apache.hama.bsp.message.queue.MemoryTransferProtocol;
import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageTransferQueue;
import org.apache.hama.util.BSPNetUtils;
public class TestHadoopMessageManager extends TestCase {
@@ -46,8 +48,8 @@ public class TestHadoopMessageManager ex
public void testMemoryMessaging() throws Exception {
Configuration conf = new Configuration();
- conf.set(MessageManager.QUEUE_TYPE_CLASS,
- MemoryQueue.class.getCanonicalName());
+ conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ MemoryTransferProtocol.class, MessageTransferQueue.class);
conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
messagingInternal(conf);
}
@@ -55,6 +57,8 @@ public class TestHadoopMessageManager ex
public void testDiskMessaging() throws Exception {
Configuration conf = new Configuration();
conf.set(DiskQueue.DISK_QUEUE_PATH_KEY, TMP_OUTPUT_PATH);
+ conf.setClass(MessageManager.TRANSFER_QUEUE_TYPE_CLASS,
+ DiskTransferProtocolQueue.class, MessageTransferQueue.class);
messagingInternal(conf);
}
Added: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java?rev=1470171&view=auto
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java (added)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java Sat Apr 20 15:10:57 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashSet;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
+import org.apache.hama.bsp.message.bundle.WritableMessageBundle;
+
+public class TestMessageBundle extends TestCase {
+
+ public void testPOJOWritableMessageBundle() {
+
+ POJOMessageBundle<IntWritable> messageBundle = new POJOMessageBundle<IntWritable>();
+ for (int i = 0; i < 100; ++i) {
+ messageBundle.addMessage(new IntWritable(i));
+ }
+ assertEquals(100, messageBundle.getSize());
+ assertEquals(100, messageBundle.getNumElements());
+
+ int i = 0;
+ for (IntWritable writable : messageBundle) {
+ assertEquals(i++, writable.get());
+ }
+
+ }
+
+ public void testDifferentWritableMessageBundle() {
+ WritableMessageBundle<Writable> messageBundle = new WritableMessageBundle<Writable>();
+ int numElements = 5;
+
+ HashSet<Writable> set = new HashSet<Writable>();
+
+ for (int i = 0; i < numElements; ++i) {
+ Writable w = new IntWritable(i);
+ set.add(w);
+ messageBundle.addMessage(w);
+ }
+ String msg;
+ for (int i = 0; i < numElements; ++i) {
+ msg = "" + i;
+ Writable w = new Text(msg);
+ set.add(w);
+ messageBundle.addMessage(w);
+ }
+
+ assertEquals(2 * numElements, messageBundle.getSize());
+ assertEquals(2 * numElements, messageBundle.getNumElements());
+
+ for (Writable writable : messageBundle) {
+ set.remove(writable);
+ }
+ assertTrue(set.isEmpty());
+
+ }
+
+ public void testReadWriteWritableMessageBundle() throws IOException {
+ WritableMessageBundle<Writable> messageBundle = new WritableMessageBundle<Writable>();
+ int numElements = 5;
+
+ HashSet<Writable> set = new HashSet<Writable>();
+
+ for (int i = 0; i < numElements; ++i) {
+ Writable w = new IntWritable(i);
+ set.add(w);
+ messageBundle.addMessage(w);
+ }
+ String msg;
+ for (int i = 0; i < numElements; ++i) {
+ msg = "" + i;
+ Writable w = new Text(msg);
+ set.add(w);
+ messageBundle.addMessage(w);
+ }
+
+ assertEquals(2 * numElements, messageBundle.getSize());
+ assertEquals(2 * numElements, messageBundle.getNumElements());
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream(1024);
+ DataOutput output = new DataOutputStream(outputStream);
+ messageBundle.write(output);
+
+ ByteArrayInputStream inStream = new ByteArrayInputStream(
+ outputStream.toByteArray());
+ DataInput in = new DataInputStream(inStream);
+ WritableMessageBundle<Writable> newBundle = new WritableMessageBundle<Writable>();
+ newBundle.readFields(in);
+
+ for (Writable writable : newBundle) {
+ set.remove(writable);
+ }
+ assertTrue(set.isEmpty());
+
+ }
+
+}