You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2014/05/28 10:32:21 UTC
svn commit: r1597946 - in /hama/trunk: ./
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/bundle/
core/src/main/java/org/apache/hama/bsp/message/queue/
core/src/test/java/org/apache/hama/bsp/message/
Author: edwardyoon
Date: Wed May 28 08:32:20 2014
New Revision: 1597946
URL: http://svn.apache.org/r1597946
Log:
HAMA-886: Refactoring core.bundle package (edwardyoon)
Added:
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
Removed:
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/bundle/
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/POJOMessageQueue.java
hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestMessageBundle.java
Modified:
hama/trunk/CHANGES.txt
hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed May 28 08:32:20 2014
@@ -16,6 +16,7 @@ Release 0.7.0 (unreleased changes)
IMPROVEMENTS
+ HAMA-886: Refactoring core.bundle package (edwardyoon)
HAMA-899: Add getAdjacentPeerNames() that returns the names of locally adjacent peers (edwardyoon)
HAMA-847: Vertex should provide Counters (edwardyoon)
HAMA-568: Add faster synchronized collections for message queues (edwardyoon)
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed May 28 08:32:20 2014
@@ -38,7 +38,7 @@ import org.apache.hama.bsp.message.compr
*
*/
public class BSPMessageBundle<M extends Writable> implements Writable,
- Iterable<M> {
+ Iterable<M>, BSPMessageBundleInterface<M> {
public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
@@ -111,6 +111,10 @@ public class BSPMessageBundle<M extends
bundleSize++;
}
+ public byte[] getBuffer() {
+ return byteBuffer.toByteArray();
+ }
+
public Iterator<M> iterator() {
bis = new ByteArrayInputStream(byteBuffer.toByteArray());
dis = new DataInputStream(bis);
@@ -193,7 +197,7 @@ public class BSPMessageBundle<M extends
* @return the byte length of messages
* @throws IOException
*/
- public long getLength() throws IOException {
+ public long getLength() {
return bundleLength;
}
@@ -219,4 +223,5 @@ public class BSPMessageBundle<M extends
bufferDos.write(temp);
}
}
+
}
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java?rev=1597946&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundleInterface.java Wed May 28 08:32:20 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.io.Writable;
+
+public interface BSPMessageBundleInterface<M extends Writable> {
+
+ /**
+ * @return the number of the messages.
+ */
+ public int size();
+
+ /**
+ * Add message to this bundle.
+ *
+ * @param message BSPMessage to add.
+ */
+ public void addMessage(M message);
+
+ /**
+ * @return the iterator.
+ */
+ public Iterator<M> iterator();
+
+ /**
+ * @return the message buffer.
+ */
+ public byte[] getBuffer();
+
+ /**
+ * @return the total byte length of messages
+ * @throws IOException
+ */
+ public long getLength();
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/BSPMessageInterface.java Wed May 28 08:32:20 2014
@@ -18,7 +18,7 @@
package org.apache.hama.bsp.message.queue;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
public interface BSPMessageInterface<M extends Writable> {
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/ByteArrayMessageQueue.java Wed May 28 08:32:20 2014
@@ -18,7 +18,7 @@
package org.apache.hama.bsp.message.queue;
import org.apache.hadoop.io.Writable;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
public abstract class ByteArrayMessageQueue<M extends Writable> implements
BSPMessageInterface<M>, MessageQueue<M> {
Added: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java?rev=1597946&view=auto
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java (added)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DefaultMessageQueue.java Wed May 28 08:32:20 2014
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message.queue;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.bsp.BSPMessageBundle;
+
+/**
+ * Java object message queue.
+ *
+ * @param <M> Message type.
+ */
+public abstract class DefaultMessageQueue<M extends Writable> implements
+ BSPMessageInterface<M>, Iterable<M>, MessageQueue<M> {
+
+ @Override
+ public void add(BSPMessageBundle<M> bundle){
+ addAll(bundle);
+ }
+
+}
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/DiskQueue.java Wed May 28 08:32:20 2014
@@ -45,7 +45,7 @@ import org.apache.hama.bsp.TaskAttemptID
* configuration. <br/>
* <b>It is experimental to use.</b>
*/
-public final class DiskQueue<M extends Writable> extends POJOMessageQueue<M> {
+public final class DiskQueue<M extends Writable> extends DefaultMessageQueue<M> {
public static final String DISK_QUEUE_PATH_KEY = "bsp.disk.queue.dir";
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SortedMemoryQueue.java Wed May 28 08:32:20 2014
@@ -23,9 +23,8 @@ import java.util.concurrent.PriorityBloc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
-import org.apache.hama.bsp.message.bundle.POJOMessageBundle;
/**
* Heap (Java's priority queue) based message queue implementation that supports
@@ -115,7 +114,7 @@ public final class SortedMemoryQueue<M e
@Override
public void add(BSPMessageBundle<M> bundle) {
- addAll((POJOMessageBundle<M>) bundle);
+ addAll(bundle);
}
@Override
Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java?rev=1597946&r1=1597945&r2=1597946&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/message/queue/SpillingQueue.java Wed May 28 08:32:20 2014
@@ -32,8 +32,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hama.Constants;
import org.apache.hama.bsp.TaskAttemptID;
-import org.apache.hama.bsp.message.bundle.BSPMessageBundle;
-import org.apache.hama.bsp.message.bundle.HeapByteArrayBSPMessageBundle;
+import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.PreFetchCache;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
@@ -346,8 +345,7 @@ public class SpillingQueue<M extends Wri
@Override
public void add(BSPMessageBundle<M> bundle) {
try {
- this.spillOutputBuffer.write(((HeapByteArrayBSPMessageBundle<M>) bundle)
- .getBuffer());
+ this.spillOutputBuffer.write(bundle.getBuffer());
} catch (IOException e) {
throw new RuntimeException(e);
}