You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/19 04:36:38 UTC
svn commit: r1094843 - in /incubator/hama/trunk: ./
src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/
Author: edwardyoon
Date: Tue Apr 19 02:36:37 2011
New Revision: 1094843
URL: http://svn.apache.org/viewvc?rev=1094843&view=rev
Log:
Send messages in batches to reduce RPC overhead
Added:
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 19 02:36:37 2011
@@ -8,16 +8,8 @@ Release 0.3 - Unreleased
IMPROVEMENTS
+ HAMA-380: Send messages in batches to reduce RPC overhead (Miklos Erdelyi via edwardyoon)
HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)
-
-Release 0.2.1 - Unreleased
-
- NEW FEATURES
-
- BUG FIXES
-
- IMPROVEMENTS
-
HAMA-369: Reduce overhead of local communications (Miklos Erdelyi via edwardyoon)
HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content
in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)
Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1094843&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java Tue Apr 19 02:36:37 2011
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSPMessageBundle stores a group of BSPMessages so that they can be sent in
+ * batch rather than individually.
+ *
+ */
+public class BSPMessageBundle implements Writable {
+
+ private List<BSPMessage> messages = new ArrayList<BSPMessage>();
+
+ public BSPMessageBundle() {
+ }
+
+ /**
+ * Add message to this bundle.
+ *
+ * @param message BSPMessage to add.
+ */
+ public void addMessage(BSPMessage message) {
+ messages.add(message);
+ }
+
+ public List<BSPMessage> getMessages() {
+ return messages;
+ }
+
+ public void setMessages(List<BSPMessage> messages) {
+ this.messages = messages;
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(messages.size());
+ if (messages.size() > 0) {
+ // Write out message class.
+ BSPMessage element = messages.get(0);
+ Class<? extends BSPMessage> clazz = element.getClass();
+ out.writeUTF(clazz.getName());
+ // Serialize contents of this bundle.
+ for (BSPMessage message : messages) {
+ message.write(out);
+ }
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int numMessages = in.readInt();
+ if (numMessages > 0) {
+ // Get classname of messages.
+ String className = in.readUTF();
+ Class<? extends BSPMessage> clazz = null;
+ try {
+ clazz = (Class<? extends BSPMessage>) Class.forName(className);
+ } catch (ClassNotFoundException ex) {
+ throw new IOException(ex);
+ }
+ // Deserialize messages.
+ messages = new ArrayList<BSPMessage>(numMessages);
+ for (int i = 0; i < numMessages; ++i) {
+ try {
+ // Instantiate new message and deserialize it.
+ BSPMessage newMessage = clazz.newInstance();
+ newMessage.readFields(in);
+ messages.add(newMessage);
+ } catch (InstantiationException ex) {
+ throw new IOException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new IOException(ex);
+ }
+ }
+ } else {
+ messages = new ArrayList<BSPMessage>();
+ }
+ }
+
+}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 19 02:36:37 2011
@@ -175,26 +175,21 @@ public class BSPPeer implements Watcher,
enterBarrier();
Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
.entrySet().iterator();
- Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry;
- ConcurrentLinkedQueue<BSPMessage> queue;
- BSPPeerInterface peer;
-
- Iterator<BSPMessage> messages;
while (it.hasNext()) {
- entry = it.next();
+ Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+ .next();
- peer = peers.get(entry.getKey());
+ BSPPeerInterface peer = peers.get(entry.getKey());
if (peer == null) {
peer = getBSPPeerConnection(entry.getKey());
}
- queue = entry.getValue();
- messages = queue.iterator();
-
- // TODO - to be improved by collective communication and compression
- while (messages.hasNext()) {
- peer.put(messages.next());
+ Iterable<BSPMessage> messages = entry.getValue();
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ for (BSPMessage message : messages) {
+ bundle.addMessage(message);
}
+ peer.put(bundle);
}
waitForSync();
@@ -302,6 +297,13 @@ public class BSPPeer implements Watcher,
}
@Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ for (BSPMessage message : messages.getMessages()) {
+ this.localQueueForNextIteration.add(message);
+ }
+ }
+
+ @Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return BSPPeerInterface.versionID;
}
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Apr 19 02:36:37 2011
@@ -49,6 +49,14 @@ public interface BSPPeerInterface extend
public void put(BSPMessage msg) throws IOException;
/**
+ * Puts a bundle of messages to local queue.
+ *
+ * @param messages
+ * @throws IOException
+ */
+ public void put(BSPMessageBundle messages) throws IOException;
+
+ /**
* @return A message from the peer's received messages queue (a FIFO).
* @throws IOException
*/
Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Apr 19 02:36:37 2011
@@ -242,7 +242,7 @@ public class GroomServer implements Runn
// TODO Use the cleanup thread
// tasksToCleanup.put(action);
-
+
KillTaskAction killAction = (KillTaskAction) action;
if (tasks.containsKey(killAction.getTaskID())) {
TaskInProgress tip = tasks.get(killAction.getTaskID());
@@ -848,6 +848,11 @@ public class GroomServer implements Runn
}
@Override
+ public void put(BSPMessageBundle messages) throws IOException {
+ bspPeer.put(messages);
+ }
+
+ @Override
public BSPMessage getCurrentMessage() throws IOException {
return bspPeer.getCurrentMessage();
}
Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1094843&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java Tue Apr 19 02:36:37 2011
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import junit.framework.TestCase;
+
+public class TestBSPMessageBundle extends TestCase {
+
+ public void testEmpty() throws IOException {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ // Serialize it.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ bundle.write(new DataOutputStream(baos));
+ baos.close();
+ // Deserialize it.
+ BSPMessageBundle readBundle = new BSPMessageBundle();
+ readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+ .toByteArray())));
+ assertEquals(0, readBundle.getMessages().size());
+ }
+
+ public void testSerializationDeserialization() throws IOException {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+ ByteMessage[] testMessages = new ByteMessage[16];
+ for (int i = 0; i < testMessages.length; ++i) {
+ // Create a one byte tag containing the number of the message.
+ byte[] tag = new byte[1];
+ tag[0] = (byte) i;
+ // Create a four bytes data part containing serialized number of the
+ // message.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ baos.write(i);
+ baos.close();
+ byte[] data = baos.toByteArray();
+ testMessages[i] = new ByteMessage(tag, data);
+ bundle.addMessage(testMessages[i]);
+ }
+ // Serialize it.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ bundle.write(new DataOutputStream(baos));
+ baos.close();
+ // Deserialize it.
+ BSPMessageBundle readBundle = new BSPMessageBundle();
+ readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+ .toByteArray())));
+ // Check contents.
+ int messageNumber = 0;
+ for (BSPMessage message : readBundle.getMessages()) {
+ ByteMessage byteMessage = (ByteMessage) message;
+ assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
+ byteMessage.getTag()));
+ assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
+ byteMessage.getData()));
+ ++messageNumber;
+ }
+ assertEquals(testMessages.length, messageNumber);
+ }
+}