You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/05/28 10:32:21 UTC

svn commit: r1597946 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/bundle/ core/src/main/java/org/apache/hama/bsp/message/queue/ core/src/test/java/org/apache/hama/bsp/message/

Author: edwardyoon
Date: Wed May 28 08:32:20 2014
New Revision: 1597946

URL: http://svn.apache.org/r1597946
Log:
HAMA-886: Refactoring core.bundle package (edwardyoon)

Added:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed May 28 08:32:20 2014
@@ -16,6 +16,7 @@ Release 0.7.0 (unreleased changes)
 
   IMPROVEMENTS
   
+   HAMA-886: Refactoring core.bundle package (edwardyoon)
    HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers (edwardyoon)
    HAMA-847: Vertex should provide Counters (edwardyoon)
    HAMA-568: Add faster synchronized collections for message queues (edwardyoon)

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed May 28 08:32:20 2014
@@ -38,7 +38,7 @@ import org.apache.hama.bsp.message.compr
  * 
  */
 public class BSPMessageBundle<M extends Writable> implements Writable,
-    Iterable<M> {
+    Iterable<M>, BSPMessageBundleInterface<M> {
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
@@ -111,6 +111,10 @@ public class BSPMessageBundle<M extends 
     bundleSize++;
   }
 
+  public byte[] getBuffer() {
+    return byteBuffer.toByteArray();
+  }
+  
   public Iterator<M> iterator() {
     bis = new ByteArrayInputStream(byteBuffer.toByteArray());
     dis = new DataInputStream(bis);
@@ -193,7 +197,7 @@ public class BSPMessageBundle<M extends 
    * @return the byte length of messages
    * @throws IOException
    */
-  public long getLength() throws IOException {
+  public long getLength() {
     return bundleLength;
   }
 
@@ -219,4 +223,5 @@ public class BSPMessageBundle<M extends 
       bufferDos.write(temp);
     }
   }
+
 }

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java?rev=1597946&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java Wed May 28 08:32:20 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+public interface BSPMessageBundleInterface<M extends Writable> {
+
+  /**
+   * @return the number of the messages.
+   */
+  public int size();
+  
+  /**
+   * Add message to this bundle.
+   * 
+   * @param message BSPMessage to add.
+   */
+  public void addMessage(M message);
+  
+  /**
+   * @return the iterator.
+   */
+  public Iterator<M> iterator();
+  
+  /**
+   * @return the message buffer.
+   */
+  public byte[] getBuffer();
+  
+  /**
+   * @return the total byte length of messages
+   * @throws IOException
+   */
+  public long getLength();
+  
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java Wed May 28 08:32:20 2014
@@ -18,7 +18,7 @@
 package org.apache.hama.bsp.message.queue;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
 
 public interface BSPMessageInterface<M extends Writable> {
   

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java Wed May 28 08:32:20 2014
@@ -18,7 +18,7 @@
 package org.apache.hama.bsp.message.queue;
 
 import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
 
 public abstract class ByteArrayMessageQueue<M extends Writable> implements
     BSPMessageInterface<M>, MessageQueue<M> {

Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java?rev=1597946&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java Wed May 28 08:32:20 2014
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+/**
+ * Java object message queue.
+ *
+ * @param <M> Message type.
+ */
+public abstract class DefaultMessageQueue<M extends Writable> implements
+    BSPMessageInterface<M>, Iterable<M>, MessageQueue<M> {
+
+  @Override
+  public void add(BSPMessageBundle<M> bundle){
+    addAll(bundle);
+  }
+  
+}

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed May 28 08:32:20 2014
@@ -45,7 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
  * configuration. <br/>
  * <b>It is experimental to use.</b>
  */
-public final class DiskQueue<M extends Writable> extends POJOMessageQueue<M> {
+public final class DiskQueue<M extends Writable> extends DefaultMessageQueue<M> {
 
   public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Wed May 28 08:32:20 2014
@@ -23,9 +23,8 @@ import java.util.concurrent.PriorityBloc
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
-import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
 
 /**
  * Heap (Java's priority queue) based message queue implementation that supports
@@ -115,7 +114,7 @@ public final class SortedMemoryQueue<M e
 
   @Override
   public void add(BSPMessageBundle<M> bundle) {
-    addAll((POJOMessageBundle<M>) bundle);
+    addAll(bundle);
   }
 
   @Override

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Wed May 28 08:32:20 2014
@@ -32,8 +32,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.Constants;
 import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
-import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
 import org.apache.hama.bsp.message.io.PreFetchCache;
 import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
@@ -346,8 +345,7 @@ public class SpillingQueue<M extends Wri
   @Override
   public void add(BSPMessageBundle<M> bundle) {
     try {
-      this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)
-          .getBuffer());
+      this.spillOutputBuffer.write(bundle.getBuffer());
     } catch (IOException e) {
       throw new RuntimeException(e);
     }