You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/05/17 16:54:40 UTC

svn commit: r1339626 - in /incubator/hama/trunk: CHANGES.txt core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java

Author: tjungblut
Date: Thu May 17 14:54:40 2012
New Revision: 1339626

URL: http://svn.apache.org/viewvc?rev=1339626&view=rev
Log:
[HAMA-552]: Add a sorted message queue

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java
Modified:
    incubator/hama/trunk/CHANGES.txt

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1339626&r1=1339625&r2=1339626&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu May 17 14:54:40 2012
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES
-   
+
+   HAMA-552: Add a sorted message queue (tjungblut)   
    HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)
    HAMA-508: Add clean plugin (Mikalai Parafeniuk via edwardyoon)
    HAMA-503: Chainable computations for fault tolerance (tjungblut)

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java?rev=1339626&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/SortedMessageQueue.java Thu May 17 14:54:40 2012
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+
+/**
+ * Heap (Java's priority queue) based message queue implementation that supports
+ * sorted receive and send.
+ */
+public final class SortedMessageQueue<M extends WritableComparable<M>>
+    implements MessageQueue<M> {
+
+  private final PriorityQueue<M> queue = new PriorityQueue<M>();
+  private Configuration conf;
+
+  @Override
+  public Iterator<M> iterator() {
+    return queue.iterator();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void addAll(Collection<M> col) {
+    queue.addAll(col);
+  }
+
+  @Override
+  public void addAll(MessageQueue<M> otherqueue) {
+    M poll = null;
+    while ((poll = otherqueue.poll()) != null) {
+      queue.add(poll);
+    }
+  }
+
+  @Override
+  public void add(M item) {
+    queue.add(item);
+  }
+
+  @Override
+  public void clear() {
+    queue.clear();
+  }
+
+  @Override
+  public M poll() {
+    return queue.poll();
+  }
+
+  @Override
+  public int size() {
+    return queue.size();
+  }
+
+  // empty, not needed to implement
+
+  @Override
+  public void init(Configuration conf, TaskAttemptID id) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public void prepareRead() {
+
+  }
+
+  @Override
+  public void prepareWrite() {
+
+  }
+
+}