You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ap...@apache.org on 2012/11/21 20:14:22 UTC

svn commit: r1412231 - in /hama/trunk/core/src: main/java/org/apache/hama/bsp/ main/java/org/apache/hama/bsp/message/ main/java/org/apache/hama/bsp/message/queue/ test/java/org/apache/hama/bsp/ test/java/org/apache/hama/bsp/message/

Author: apurv
Date: Wed Nov 21 19:14:21 2012
New Revision: 1412231

URL: http://svn.apache.org/viewvc?rev=1412231&view=rev
Log:
Abstracted queues in a separate package

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html
Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SingleLockQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SynchronizedQueue.java
Modified:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Wed Nov 21 19:14:21 2012
@@ -42,7 +42,7 @@ import org.apache.hama.bsp.ft.BSPFaultTo
 import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageManager;
 import org.apache.hama.bsp.message.MessageManagerFactory;
-import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.bsp.sync.SyncServiceFactory;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AbstractMessageManager.java Wed Nov 21 19:14:21 2012
@@ -37,6 +37,11 @@ import org.apache.hama.bsp.BSPMessageBun
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SingleLockQueue;
+import org.apache.hama.bsp.message.queue.SynchronizedQueue;
 import org.apache.hama.util.BSPNetUtils;
 
 /**

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManager.java Wed Nov 21 19:14:21 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.MessageQueue;
 
 /**
  * This manager takes care of the messaging. It is responsible to launch a

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A disk based queue that is backed by a raw file on local disk. <br/>
+ * Structure is as follows: <br/>
+ * If "bsp.disk.queue.dir" is not defined, "hama.tmp.dir" will be used instead. <br/>
+ * ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/ <br/>
+ * An ongoing sequencenumber will be appended to prevent inner collisions,
+ * however the job_id dir will never be deleted. So you need a cronjob to do the
+ * cleanup for you. <br/>
+ * It is recommended to use the file:// scheme in front of the property, because
+ * writes on DFS are expensive, however your local disk may not have enough
+ * space for your message, so you can easily switch per job via your
+ * configuration. <br/>
+ * <b>It is experimental to use.</b>
+ */
+public final class DiskQueue<M extends Writable> implements MessageQueue<M> {
+
+  public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
+
+  private static final int MAX_RETRIES = 4;
+  private static final Log LOG = LogFactory.getLog(DiskQueue.class);
+
+  private static volatile int ONGOING_SEQUENCE_NUMBER = 0;
+
+  private int size = 0;
+  // injected via reflection
+  private Configuration conf;
+  private FileSystem fs;
+
+  private FSDataOutputStream writer;
+  private FSDataInputStream reader;
+
+  private Path queuePath;
+  private TaskAttemptID id;
+  private final ObjectWritable writable = new ObjectWritable();
+
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+    this.id = id;
+    writable.setConf(conf);
+    try {
+      fs = FileSystem.get(conf);
+      String configuredQueueDir = conf.get(DISK_QUEUE_PATH_KEY);
+      Path queueDir = null;
+      queueDir = getQueueDir(conf, id, configuredQueueDir);
+      fs.mkdirs(queueDir);
+      queuePath = new Path(queueDir, (ONGOING_SEQUENCE_NUMBER++)
+          + "_messages.seq");
+      prepareWrite();
+    } catch (IOException e) {
+      // we can't recover if something bad happens here..
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    closeInternal(true);
+    try {
+      fs.delete(queuePath.getParent(), true);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  /**
+   * Close our writer internal, basically should be called after the computation
+   * phase ended.
+   */
+  private void closeInternal(boolean delete) {
+    try {
+      if (writer != null) {
+        writer.flush();
+        writer.close();
+        writer = null;
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      if (fs != null && delete) {
+        try {
+          fs.delete(queuePath, true);
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+      if (writer != null) {
+        try {
+          writer.flush();
+          writer.close();
+          writer = null;
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+    try {
+      if (reader != null) {
+        reader.close();
+        reader = null;
+      }
+    } catch (IOException e) {
+      LOG.error(e);
+    } finally {
+      if (reader != null) {
+        try {
+          reader.close();
+          reader = null;
+        } catch (IOException e) {
+          LOG.error(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public void prepareRead() {
+    // make sure we've closed
+    closeInternal(false);
+    try {
+      reader = fs.open(queuePath);
+    } catch (IOException e) {
+      // can't recover from that
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void prepareWrite() {
+    try {
+      writer = fs.create(queuePath);
+    } catch (IOException e) {
+      // can't recover from that
+      LOG.error(e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public final void addAll(Collection<M> col) {
+    for (M item : col) {
+      add(item);
+    }
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      add(poll);
+    }
+  }
+
+  @Override
+  public final void add(M item) {
+    size++;
+    try {
+      new ObjectWritable(item).write(writer);
+    } catch (IOException e) {
+      LOG.error(e);
+    }
+  }
+
+  @Override
+  public final void clear() {
+    closeInternal(true);
+    size = 0;
+    init(conf, id);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public final M poll() {
+    if (size == 0) {
+      return null;
+    }
+    size--;
+    int tries = 1;
+    while (tries <= MAX_RETRIES) {
+      try {
+        writable.readFields(reader);
+        if (size > 0) {
+          return (M) writable.get();
+        } else {
+          closeInternal(true);
+          return (M) writable.get();
+        }
+      } catch (IOException e) {
+        LOG.error("Retrying for the " + tries + "th time!", e);
+      }
+      tries++;
+    }
+    throw new RuntimeException("Message couldn't be read for " + tries
+        + " times! Giving up!");
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  @Override
+  public Iterator<M> iterator() {
+    return new DiskIterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  private class DiskIterator implements Iterator<M> {
+
+    public DiskIterator() {
+      prepareRead();
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (size == 0) {
+        closeInternal(false);
+      }
+      return size != 0;
+    }
+
+    @Override
+    public M next() {
+      return poll();
+    }
+
+    @Override
+    public void remove() {
+      // no-op
+    }
+
+  }
+
+  /**
+   * Creates a path for a queue
+   */
+  public static Path getQueueDir(Configuration conf, TaskAttemptID id,
+      String configuredQueueDir) {
+    Path queueDir;
+    if (configuredQueueDir == null) {
+      String hamaTmpDir = conf.get("hama.tmp.dir");
+      if (hamaTmpDir != null) {
+        queueDir = createDiskQueuePath(id, hamaTmpDir);
+      } else {
+        // use some local tmp dir
+        queueDir = createDiskQueuePath(id, "/tmp/messageStorage/");
+      }
+    } else {
+      queueDir = createDiskQueuePath(id, configuredQueueDir);
+    }
+    return queueDir;
+  }
+
+  /**
+   * Creates a generic Path based on the configured path and the task attempt id
+   * to store disk sequence files. <br/>
+   * Structure is as follows: ${hama.tmp.dir}/diskqueue/job_id/task_attempt_id/
+   */
+  private static Path createDiskQueuePath(TaskAttemptID id,
+      String configuredPath) {
+    return new Path(new Path(new Path(configuredPath, "diskqueue"), id
+        .getJobID().toString()), id.getTaskID().toString());
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MemoryQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.ArrayDeque;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * LinkedList backed queue structure for bookkeeping messages.
+ */
+public final class MemoryQueue<M extends Writable> implements MessageQueue<M> {
+
+  private final Deque<M> deque = new ArrayDeque<M>();
+  private Configuration conf;
+
+  @Override
+  public final void addAll(Collection<M> col) {
+    deque.addAll(col);
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      deque.add(poll);
+    }
+  }
+
+  @Override
+  public final void add(M item) {
+    deque.add(item);
+  }
+
+  @Override
+  public final void clear() {
+    deque.clear();
+  }
+
+  @Override
+  public final M poll() {
+    return deque.poll();
+  }
+
+  @Override
+  public final int size() {
+    return deque.size();
+  }
+
+  @Override
+  public final Iterator<M> iterator() {
+    return deque.iterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  // not doing much here
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void prepareRead() {
+
+  }
+
+  @Override
+  public void prepareWrite() {
+
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/MessageQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Simple queue interface.
+ */
+public interface MessageQueue<M> extends Iterable<M>, Configurable {
+
+  /**
+   * Used to initialize the queue.
+   */
+  public void init(Configuration conf, TaskAttemptID id);
+
+  /**
+   * Finally close the queue. Commonly used to free resources.
+   */
+  public void close();
+
+  /**
+   * Called to prepare a queue for reading.
+   */
+  public void prepareRead();
+
+  /**
+   * Called to prepare a queue for writing.
+   */
+  public void prepareWrite();
+
+  /**
+   * Adds a whole Java Collection to the implementing queue.
+   */
+  public void addAll(Collection<M> col);
+
+  /**
+   * Adds the other queue to this queue.
+   */
+  public void addAll(MessageQueue<M> otherqueue);
+
+  /**
+   * Adds a single item to the implementing queue.
+   */
+  public void add(M item);
+
+  /**
+   * Clears all entries in the given queue.
+   */
+  public void clear();
+
+  /**
+   * Polls for the next item in the queue (FIFO).
+   * 
+   * @return a new item or null if none are present.
+   */
+  public M poll();
+
+  /**
+   * @return how many items are in the queue.
+   */
+  public int size();
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SingleLockQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * A global mutex based synchronized queue.
+ */
+public final class SingleLockQueue<T> implements SynchronizedQueue<T> {
+
+  private final MessageQueue<T> queue;
+  private final Object mutex;
+
+  private SingleLockQueue(MessageQueue<T> queue) {
+    this.queue = queue;
+    this.mutex = new Object();
+  }
+
+  private SingleLockQueue(MessageQueue<T> queue, Object mutex) {
+    this.queue = queue;
+    this.mutex = mutex;
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#iterator()
+   */
+  @Override
+  public Iterator<T> iterator() {
+    synchronized (mutex) {
+      return queue.iterator();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#setConf(org.apache.hadoop
+   * .conf.Configuration)
+   */
+  @Override
+  public void setConf(Configuration conf) {
+    synchronized (mutex) {
+      queue.setConf(conf);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#getConf()
+   */
+  @Override
+  public Configuration getConf() {
+    synchronized (mutex) {
+      return queue.getConf();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#init(org.apache.hadoop.conf
+   * .Configuration, org.apache.hama.bsp.TaskAttemptID)
+   */
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+    synchronized (mutex) {
+      queue.init(conf, id);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#close()
+   */
+  @Override
+  public void close() {
+    synchronized (mutex) {
+    }
+    queue.close();
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#prepareRead()
+   */
+  @Override
+  public void prepareRead() {
+    synchronized (mutex) {
+      queue.prepareRead();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see
+   * org.apache.hama.bsp.message.SynchronizedQueue#addAll(java.util.Collection)
+   */
+  @Override
+  public void addAll(Collection<T> col) {
+    synchronized (mutex) {
+      queue.addAll(col);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#add(T)
+   */
+  @Override
+  public void add(T item) {
+    synchronized (mutex) {
+      queue.add(item);
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#clear()
+   */
+  @Override
+  public void clear() {
+    synchronized (mutex) {
+      queue.clear();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#poll()
+   */
+  @Override
+  public Object poll() {
+    synchronized (mutex) {
+      return queue.poll();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#size()
+   */
+  @Override
+  public int size() {
+    synchronized (mutex) {
+      return queue.size();
+    }
+  }
+
+  /*
+   * (non-Javadoc)
+   * @see org.apache.hama.bsp.message.SynchronizedQueue#getMessageQueue()
+   */
+  @Override
+  public MessageQueue<T> getMessageQueue() {
+    synchronized (mutex) {
+      return queue;
+    }
+  }
+
+  /*
+   * static constructor methods to be type safe
+   */
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue) {
+    return new SingleLockQueue<T>(queue);
+  }
+
+  public static <T> SynchronizedQueue<T> synchronize(MessageQueue<T> queue,
+      Object mutex) {
+    return new SingleLockQueue<T>(queue, mutex);
+  }
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMessageQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Heap (Java's priority queue) based message queue implementation that supports
+ * sorted receive and send.
+ */
+public final class SortedMessageQueue<M extends WritableComparable<M>>
+    implements MessageQueue<M> {
+
+  private final PriorityQueue<M> queue = new PriorityQueue<M>();
+  private Configuration conf;
+
+  @Override
+  public Iterator<M> iterator() {
+    return queue.iterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void addAll(Collection<M> col) {
+    queue.addAll(col);
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      queue.add(poll);
+    }
+  }
+
+  @Override
+  public void add(M item) {
+    queue.add(item);
+  }
+
+  @Override
+  public void clear() {
+    queue.clear();
+  }
+
+  @Override
+  public M poll() {
+    return queue.poll();
+  }
+
+  @Override
+  public int size() {
+    return queue.size();
+  }
+
+  // empty, not needed to implement
+
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void prepareRead() {
+
+  }
+
+  @Override
+  public void prepareWrite() {
+
+  }
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SynchronizedQueue.java Wed Nov 21 19:14:21 2012
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Synchronized Queue interface. Can be used to implement better synchronized
+ * datastructures.
+ */
+public interface SynchronizedQueue<T> extends Configurable {
+
+  public abstract Iterator<T> iterator();
+
+  public abstract void init(Configuration conf, TaskAttemptID id);
+
+  public abstract void close();
+
+  public abstract void prepareRead();
+
+  public abstract void addAll(Collection<T> col);
+
+  public abstract void add(T item);
+
+  public abstract void clear();
+
+  public abstract Object poll();
+
+  public abstract int size();
+
+  public abstract MessageQueue<T> getMessageQueue();
+
+}

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html?rev=1412231&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/package.html Wed Nov 21 19:14:21 2012
@@ -0,0 +1,24 @@
+<html>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<body>
+Contains all queue implementations. 
+</body>
+</html>
+

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestBSPMasterGroomServer.java Wed Nov 21 19:14:21 2012
@@ -31,8 +31,8 @@ import org.apache.hadoop.io.Text;
 import org.apache.hama.Constants;
 import org.apache.hama.HamaCluster;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.message.DiskQueue;
 import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.examples.ClassSerializePrinting;
 
 public class TestBSPMasterGroomServer extends HamaCluster {

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Wed Nov 21 19:14:21 2012
@@ -51,7 +51,7 @@ import org.apache.hama.bsp.ft.AsyncRcvdM
 import org.apache.hama.bsp.ft.FaultTolerantPeerService;
 import org.apache.hama.bsp.message.MessageEventListener;
 import org.apache.hama.bsp.message.MessageManager;
-import org.apache.hama.bsp.message.MessageQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.bsp.sync.BSPPeerSyncClient;
 import org.apache.hama.bsp.sync.PeerSyncClient;
 import org.apache.hama.bsp.sync.SyncEvent;

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Wed Nov 21 19:14:21 2012
@@ -33,6 +33,7 @@ import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.message.compress.BSPMessageCompressorFactory;
 import org.apache.hama.bsp.message.compress.SnappyCompressor;
+import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.apache.hama.bsp.message.type.BooleanMessage;
 import org.apache.hama.bsp.message.type.DoubleMessage;
 import org.apache.hama.bsp.message.type.IntegerMessage;

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestDiskQueue.java Wed Nov 21 19:14:21 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hama.bsp.TaskAttemptID;
 import org.apache.hama.bsp.TaskID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
 import org.junit.Test;
 
 public class TestDiskQueue extends TestCase {

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1412231&r1=1412230&r2=1412231&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Wed Nov 21 19:14:21 2012
@@ -32,6 +32,9 @@ import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.BSPPeerImpl;
 import org.apache.hama.bsp.Counters;
 import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.bsp.message.queue.DiskQueue;
+import org.apache.hama.bsp.message.queue.MemoryQueue;
+import org.apache.hama.bsp.message.queue.MessageQueue;
 import org.apache.hama.util.BSPNetUtils;
 
 public class TestHadoopMessageManager extends TestCase {