You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ap...@apache.org on 2012/11/21 20:14:22 UTC
svn commit: r1412231 - in /hama/trunk/core/src:
main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/
main/java/org/apache/hama/bsp/message/queue/ test/java/org/apache/hama/bsp/
test/java/org/apache/hama/bsp/message/
Author: apurv
Date: Wed Nov 21 19:14:21 2012
New Revision: 1412231
URL: http://svn.apache.org/viewvc?rev=1412231&view=rev
Log:
Abstracted queues in a separate package
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
Modified:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Wed Nov 21 19:14:21 2012
@@ -42,7 +42,7 @@ import org.apache.hama.bsp.ft.BSPFaultTo
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageManager;
import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncException;
import org.apache.hama.bsp.sync.SyncServiceFactory;
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Wed Nov 21 19:14:21 2012
@@ -37,6 +37,11 @@ import org.apache.hama.bsp.BSPMessageBun
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SingleLockQueue;
+import org.apache.hama.bsp.message.queue.SynchronizedQueue;
import org.apache.hama.util.BSPNetUtils;
/**
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Wed Nov 21 19:14:21 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.MessageQueue;
/**
* This manager takes care of the messaging. It is responsible to launch a
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A disk based queue that is backed by a raw file on local disk. <br/>
+ * Structure is as follows: <br/>
+ * If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/>
+ * ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/>
+ * An ongoing sequencenumber will be appended to prevent inner collisions,
+ * however the job_id dir will never be deleted. So you need a cronjob to do the
+ * cleanup for you. <br/>
+ * It is recommended to use the file:// scheme in front of the property, because
+ * writes on DFS are expensive, however your local disk may not have enough
+ * space for your message, so you can easily switch per job via your
+ * configuration. <br/>
+ * <b>It is experimental to use.</b>
+ */
+public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+
+ public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
+
+ private static final int MAX_RETRIES = 4;
+ private static final Log LOG = LogFactory.getLog(DiskQueue.class);
+
+ private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
+
+ private int size = 0;
+ // injected via reflection
+ private Configuration conf;
+ private FileSystem fs;
+
+ private FSDataOutputStream writer;
+ private FSDataInputStream reader;
+
+ private Path queuePath;
+ private TaskAttemptID id;
+ private final ObjectWritable writable = new ObjectWritable();
+
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+ this.id = id;
+ writable.setConf(conf);
+ try {
+ fs = FileSystem.get(conf);
+ String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY);
+ Path queueDir = null;
+ queueDir = getQueueDir(conf, id, configuredQueueDir);
+ fs.mkdirs(queueDir);
+ queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++)
+ + "_messages.seq");
+ prepareWrite();
+ } catch (IOException e) {
+ // we can't recover if something bad happens here..
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ closeInternal(true);
+ try {
+ fs.delete(queuePath.getParent(), true);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ /**
+ * Close our writer internal, basically should be called after the computation
+ * phase ended.
+ */
+ private void closeInternal(boolean delete) {
+ try {
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ writer = null;
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ } finally {
+ if (fs != null && delete) {
+ try {
+ fs.delete(queuePath, true);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ if (writer != null) {
+ try {
+ writer.flush();
+ writer.close();
+ writer = null;
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ try {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ } catch (IOException e) {
+ LOG.error(e);
+ } finally {
+ if (reader != null) {
+ try {
+ reader.close();
+ reader = null;
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void prepareRead() {
+ // make sure we've closed
+ closeInternal(false);
+ try {
+ reader = fs.open(queuePath);
+ } catch (IOException e) {
+ // can't recover from that
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void prepareWrite() {
+ try {
+ writer = fs.create(queuePath);
+ } catch (IOException e) {
+ // can't recover from that
+ LOG.error(e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public final void addAll(Collection<M> col) {
+ for (M item : col) {
+ add(item);
+ }
+ }
+
+ @Override
+ public void addAll(MessageQueue<M> otherqueue) {
+ M poll = null;
+ while ((poll = otherqueue.poll()) != null) {
+ add(poll);
+ }
+ }
+
+ @Override
+ public final void add(M item) {
+ size++;
+ try {
+ new ObjectWritable(item).write(writer);
+ } catch (IOException e) {
+ LOG.error(e);
+ }
+ }
+
+ @Override
+ public final void clear() {
+ closeInternal(true);
+ size = 0;
+ init(conf, id);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public final M poll() {
+ if (size == 0) {
+ return null;
+ }
+ size--;
+ int tries = 1;
+ while (tries <= MAX_RETRIES) {
+ try {
+ writable.readFields(reader);
+ if (size > 0) {
+ return (M) writable.get();
+ } else {
+ closeInternal(true);
+ return (M) writable.get();
+ }
+ } catch (IOException e) {
+ LOG.error("Retrying for the " + tries + "th time!", e);
+ }
+ tries++;
+ }
+ throw new RuntimeException("Message couldn't be read for " + tries
+ + " times! Giving up!");
+ }
+
+ @Override
+ public int size() {
+ return size;
+ }
+
+ @Override
+ public Iterator<M> iterator() {
+ return new DiskIterator();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ private class DiskIterator implements Iterator<M> {
+
+ public DiskIterator() {
+ prepareRead();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (size == 0) {
+ closeInternal(false);
+ }
+ return size != 0;
+ }
+
+ @Override
+ public M next() {
+ return poll();
+ }
+
+ @Override
+ public void remove() {
+ // no-op
+ }
+
+ }
+
+ /**
+ * Creates a path for a queue
+ */
+ public static Path getQueueDir(Configuration conf, TaskAttemptID id,
+ String configuredQueueDir) {
+ Path queueDir;
+ if (configuredQueueDir == null) {
+ String hamaTmpDir = conf.get("hama.tmp.dir");
+ if (hamaTmpDir != null) {
+ queueDir = createDiskQueuePath(id, hamaTmpDir);
+ } else {
+ // use some local tmp dir
+ queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
+ }
+ } else {
+ queueDir = createDiskQueuePath(id, configuredQueueDir);
+ }
+ return queueDir;
+ }
+
+ /**
+ * Creates a generic Path based on the configured path and the task attempt id
+ * to store disk sequence files. <br/>
+ * Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/
+ */
+ private static Path createDiskQueuePath(TaskAttemptID id,
+ String configuredPath) {
+ return new Path(new Path(new Path(configuredPath, "diskqueue"), id
+ .getJobID().toString()), id.getTaskID().toString());
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * LinkedList backed queue structure for bookkeeping messages.
+ */
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
+
+ private final Deque<M> deque = new ArrayDeque<M>();
+ private Configuration conf;
+
+ @Override
+ public final void addAll(Collection<M> col) {
+ deque.addAll(col);
+ }
+
+ @Override
+ public void addAll(MessageQueue<M> otherqueue) {
+ M poll = null;
+ while ((poll = otherqueue.poll()) != null) {
+ deque.add(poll);
+ }
+ }
+
+ @Override
+ public final void add(M item) {
+ deque.add(item);
+ }
+
+ @Override
+ public final void clear() {
+ deque.clear();
+ }
+
+ @Override
+ public final M poll() {
+ return deque.poll();
+ }
+
+ @Override
+ public final int size() {
+ return deque.size();
+ }
+
+ @Override
+ public final Iterator<M> iterator() {
+ return deque.iterator();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ // not doing much here
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void prepareRead() {
+
+ }
+
+ @Override
+ public void prepareWrite() {
+
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Simple queue interface.
+ */
+public interface MessageQueue<M> extends Iterable<M>, Configurable {
+
+ /**
+ * Used to initialize the queue.
+ */
+ public void init(Configuration conf, TaskAttemptID id);
+
+ /**
+ * Finally close the queue. Commonly used to free resources.
+ */
+ public void close();
+
+ /**
+ * Called to prepare a queue for reading.
+ */
+ public void prepareRead();
+
+ /**
+ * Called to prepare a queue for writing.
+ */
+ public void prepareWrite();
+
+ /**
+ * Adds a whole Java Collection to the implementing queue.
+ */
+ public void addAll(Collection<M> col);
+
+ /**
+ * Adds the other queue to this queue.
+ */
+ public void addAll(MessageQueue<M> otherqueue);
+
+ /**
+ * Adds a single item to the implementing queue.
+ */
+ public void add(M item);
+
+ /**
+ * Clears all entries in the given queue.
+ */
+ public void clear();
+
+ /**
+ * Polls for the next item in the queue (FIFO).
+ *
+ * @return a new item or null if none are present.
+ */
+ public M poll();
+
+ /**
+ * @return how many items are in the queue.
+ */
+ public int size();
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A global mutex based synchronized queue.
+ */
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+
+ private final MessageQueue<T> queue;
+ private final Object mutex;
+
+ private SingleLockQueue(MessageQueue<T> queue) {
+ this.queue = queue;
+ this.mutex = new Object();
+ }
+
+ private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+ this.queue = queue;
+ this.mutex = mutex;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
+ */
+ @Override
+ public Iterator<T> iterator() {
+ synchronized (mutex) {
+ return queue.iterator();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
+ * .conf.Configuration)
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ synchronized (mutex) {
+ queue.setConf(conf);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#getConf()
+ */
+ @Override
+ public Configuration getConf() {
+ synchronized (mutex) {
+ return queue.getConf();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#init(org.apache.hadoop.conf
+ * .Configuration, org.apache.hama.bsp.TaskAttemptID)
+ */
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+ synchronized (mutex) {
+ queue.init(conf, id);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#close()
+ */
+ @Override
+ public void close() {
+ synchronized (mutex) {
+ }
+ queue.close();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
+ */
+ @Override
+ public void prepareRead() {
+ synchronized (mutex) {
+ queue.prepareRead();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see
+ * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
+ */
+ @Override
+ public void addAll(Collection<T> col) {
+ synchronized (mutex) {
+ queue.addAll(col);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#add(T)
+ */
+ @Override
+ public void add(T item) {
+ synchronized (mutex) {
+ queue.add(item);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
+ */
+ @Override
+ public void clear() {
+ synchronized (mutex) {
+ queue.clear();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
+ */
+ @Override
+ public Object poll() {
+ synchronized (mutex) {
+ return queue.poll();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#size()
+ */
+ @Override
+ public int size() {
+ synchronized (mutex) {
+ return queue.size();
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.hama.bsp.message.SynchronizedQueue#getMessageQueue()
+ */
+ @Override
+ public MessageQueue<T> getMessageQueue() {
+ synchronized (mutex) {
+ return queue;
+ }
+ }
+
+ /*
+ * static constructor methods to be type safe
+ */
+
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+ return new SingleLockQueue<T>(queue);
+ }
+
+ public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+ Object mutex) {
+ return new SingleLockQueue<T>(queue, mutex);
+ }
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Heap (Java's priority queue) based message queue implementation that supports
+ * sorted receive and send.
+ */
+public final class SortedMessageQueue<M extends WritableComparable<M>>
+ implements MessageQueue<M> {
+
+ private final PriorityQueue<M> queue = new PriorityQueue<M>();
+ private Configuration conf;
+
+ @Override
+ public Iterator<M> iterator() {
+ return queue.iterator();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void addAll(Collection<M> col) {
+ queue.addAll(col);
+ }
+
+ @Override
+ public void addAll(MessageQueue<M> otherqueue) {
+ M poll = null;
+ while ((poll = otherqueue.poll()) != null) {
+ queue.add(poll);
+ }
+ }
+
+ @Override
+ public void add(M item) {
+ queue.add(item);
+ }
+
+ @Override
+ public void clear() {
+ queue.clear();
+ }
+
+ @Override
+ public M poll() {
+ return queue.poll();
+ }
+
+ @Override
+ public int size() {
+ return queue.size();
+ }
+
+ // empty, not needed to implement
+
+ @Override
+ public void init(Configuration conf, TaskAttemptID id) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public void prepareRead() {
+
+ }
+
+ @Override
+ public void prepareWrite() {
+
+ }
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronized Queue interface. Can be used to implement better synchronized
+ * datastructures.
+ */
+public interface SynchronizedQueue<T> extends Configurable {
+
+ public abstract Iterator<T> iterator();
+
+ public abstract void init(Configuration conf, TaskAttemptID id);
+
+ public abstract void close();
+
+ public abstract void prepareRead();
+
+ public abstract void addAll(Collection<T> col);
+
+ public abstract void add(T item);
+
+ public abstract void clear();
+
+ public abstract Object poll();
+
+ public abstract int size();
+
+ public abstract MessageQueue<T> getMessageQueue();
+
+}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html Wed Nov 21 19:14:21 2012
@@ -0,0 +1,24 @@
+<html>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<body>
+Contains all queue implementations.
+</body>
+</html>
+
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Wed Nov 21 19:14:21 2012
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.Text;
import org.apache.hama.Constants;
import org.apache.hama.HamaCluster;
import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.DiskQueue;
import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.examples.ClassSerializePrinting;
public class TestBSPMasterGroomServer extends HamaCluster {
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Wed Nov 21 19:14:21 2012
@@ -51,7 +51,7 @@ import org.apache.hama.bsp.ft.AsyncRcvdM
import org.apache.hama.bsp.ft.FaultTolerantPeerService;
import org.apache.hama.bsp.message.MessageEventListener;
import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.bsp.sync.BSPPeerSyncClient;
import org.apache.hama.bsp.sync.PeerSyncClient;
import org.apache.hama.bsp.sync.SyncEvent;
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Wed Nov 21 19:14:21 2012
@@ -33,6 +33,7 @@ import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.queue.DiskQueue;
import org.apache.hama.bsp.message.type.BooleanMessage;
import org.apache.hama.bsp.message.type.DoubleMessage;
import org.apache.hama.bsp.message.type.IntegerMessage;
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java Wed Nov 21 19:14:21 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.IntWritable;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
import org.junit.Test;
public class TestDiskQueue extends TestCase {
Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Wed Nov 21 19:14:21 2012
@@ -32,6 +32,9 @@ import org.apache.hama.bsp.BSPPeer;
import org.apache.hama.bsp.BSPPeerImpl;
import org.apache.hama.bsp.Counters;
import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
import org.apache.hama.util.BSPNetUtils;
public class TestHadoopMessageManager extends TestCase {