You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2011/04/19 04:36:38 UTC

svn commit: r1094843 - in /incubator/hama/trunk: ./ src/java/org/apache/hama/bsp/ src/test/org/apache/hama/bsp/

Author: edwardyoon
Date: Tue Apr 19 02:36:37 2011
New Revision: 1094843

URL: http://svn.apache.org/viewvc?rev=1094843&view=rev
Log:
Send messages in batches to reduce RPC overhead

Added:
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
    incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Apr 19 02:36:37 2011
@@ -8,16 +8,8 @@ Release 0.3 - Unreleased
 
   IMPROVEMENTS
 
+    HAMA-380: Send messages in batches to reduce RPC overhead (Miklos Erdelyi via edwardyoon)
     HAMA-362: Re-design a new data structure of BSPMessage (Thomas Jungblut via edwardyoon)   
-
-Release 0.2.1 - Unreleased
-
-  NEW FEATURES
-
-  BUG FIXES
-
-  IMPROVEMENTS
-  
     HAMA-369: Reduce overhead of local communications (Miklos Erdelyi via edwardyoon)
     HAMA-371: Add a parameter to build.xml to allow developers deciding to reset content 
                        in conf files or not when building artifacts (ChiaHung Lin via edwardyoon)

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1094843&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPMessageBundle.java Tue Apr 19 02:36:37 2011
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * BSPMessageBundle stores a group of BSPMessages so that they can be sent in
+ * batch rather than individually.
+ * 
+ */
+public class BSPMessageBundle implements Writable {
+
+  private List<BSPMessage> messages = new ArrayList<BSPMessage>();
+
+  public BSPMessageBundle() {
+  }
+
+  /**
+   * Add message to this bundle.
+   * 
+   * @param message BSPMessage to add.
+   */
+  public void addMessage(BSPMessage message) {
+    messages.add(message);
+  }
+
+  public List<BSPMessage> getMessages() {
+    return messages;
+  }
+
+  public void setMessages(List<BSPMessage> messages) {
+    this.messages = messages;
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(messages.size());
+    if (messages.size() > 0) {
+      // Write out message class.
+      BSPMessage element = messages.get(0);
+      Class<? extends BSPMessage> clazz = element.getClass();
+      out.writeUTF(clazz.getName());
+      // Serialize contents of this bundle.
+      for (BSPMessage message : messages) {
+        message.write(out);
+      }
+    }
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    int numMessages = in.readInt();
+    if (numMessages > 0) {
+      // Get classname of messages.
+      String className = in.readUTF();
+      Class<? extends BSPMessage> clazz = null;
+      try {
+        clazz = (Class<? extends BSPMessage>) Class.forName(className);
+      } catch (ClassNotFoundException ex) {
+        throw new IOException(ex);
+      }
+      // Deserialize messages.
+      messages = new ArrayList<BSPMessage>(numMessages);
+      for (int i = 0; i < numMessages; ++i) {
+        try {
+          // Instantiate new message and deserialize it.
+          BSPMessage newMessage = clazz.newInstance();
+          newMessage.readFields(in);
+          messages.add(newMessage);
+        } catch (InstantiationException ex) {
+          throw new IOException(ex);
+        } catch (IllegalAccessException ex) {
+          throw new IOException(ex);
+        }
+      }
+    } else {
+      messages = new ArrayList<BSPMessage>();
+    }
+  }
+
+}

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeer.java Tue Apr 19 02:36:37 2011
@@ -175,26 +175,21 @@ public class BSPPeer implements Watcher,
     enterBarrier();
     Iterator<Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>>> it = this.outgoingQueues
         .entrySet().iterator();
-    Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry;
-    ConcurrentLinkedQueue<BSPMessage> queue;
-    BSPPeerInterface peer;
-
-    Iterator<BSPMessage> messages;
 
     while (it.hasNext()) {
-      entry = it.next();
+      Entry<InetSocketAddress, ConcurrentLinkedQueue<BSPMessage>> entry = it
+          .next();
 
-      peer = peers.get(entry.getKey());
+      BSPPeerInterface peer = peers.get(entry.getKey());
       if (peer == null) {
         peer = getBSPPeerConnection(entry.getKey());
       }
-      queue = entry.getValue();
-      messages = queue.iterator();
-
-      // TODO - to be improved by collective communication and compression
-      while (messages.hasNext()) {
-        peer.put(messages.next());
+      Iterable<BSPMessage> messages = entry.getValue();
+      BSPMessageBundle bundle = new BSPMessageBundle();
+      for (BSPMessage message : messages) {
+        bundle.addMessage(message);
       }
+      peer.put(bundle);
     }
 
     waitForSync();
@@ -302,6 +297,13 @@ public class BSPPeer implements Watcher,
   }
 
   @Override
+  public void put(BSPMessageBundle messages) throws IOException {
+    for (BSPMessage message : messages.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
   public long getProtocolVersion(String arg0, long arg1) throws IOException {
     return BSPPeerInterface.versionID;
   }

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/BSPPeerInterface.java Tue Apr 19 02:36:37 2011
@@ -49,6 +49,14 @@ public interface BSPPeerInterface extend
   public void put(BSPMessage msg) throws IOException;
 
   /**
+   * Puts a bundle of messages to local queue.
+   * 
+   * @param messages
+   * @throws IOException
+   */
+  public void put(BSPMessageBundle messages) throws IOException;
+
+  /**
    * @return A message from the peer's received messages queue (a FIFO).
    * @throws IOException
    */

Modified: incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java?rev=1094843&r1=1094842&r2=1094843&view=diff
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java (original)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/GroomServer.java Tue Apr 19 02:36:37 2011
@@ -242,7 +242,7 @@ public class GroomServer implements Runn
 
           // TODO Use the cleanup thread
           // tasksToCleanup.put(action);
-          
+
           KillTaskAction killAction = (KillTaskAction) action;
           if (tasks.containsKey(killAction.getTaskID())) {
             TaskInProgress tip = tasks.get(killAction.getTaskID());
@@ -848,6 +848,11 @@ public class GroomServer implements Runn
   }
 
   @Override
+  public void put(BSPMessageBundle messages) throws IOException {
+    bspPeer.put(messages);
+  }
+
+  @Override
   public BSPMessage getCurrentMessage() throws IOException {
     return bspPeer.getCurrentMessage();
   }

Added: incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java?rev=1094843&view=auto
==============================================================================
--- incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java (added)
+++ incubator/hama/trunk/src/test/org/apache/hama/bsp/TestBSPMessageBundle.java Tue Apr 19 02:36:37 2011
@@ -0,0 +1,81 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import junit.framework.TestCase;
+
+public class TestBSPMessageBundle extends TestCase {
+
+  public void testEmpty() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    assertEquals(0, readBundle.getMessages().size());
+  }
+
+  public void testSerializationDeserialization() throws IOException {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+    ByteMessage[] testMessages = new ByteMessage[16];
+    for (int i = 0; i < testMessages.length; ++i) {
+      // Create a one byte tag containing the number of the message.
+      byte[] tag = new byte[1];
+      tag[0] = (byte) i;
+      // Create a four bytes data part containing serialized number of the
+      // message.
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      baos.write(i);
+      baos.close();
+      byte[] data = baos.toByteArray();
+      testMessages[i] = new ByteMessage(tag, data);
+      bundle.addMessage(testMessages[i]);
+    }
+    // Serialize it.
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    bundle.write(new DataOutputStream(baos));
+    baos.close();
+    // Deserialize it.
+    BSPMessageBundle readBundle = new BSPMessageBundle();
+    readBundle.readFields(new DataInputStream(new ByteArrayInputStream(baos
+        .toByteArray())));
+    // Check contents.
+    int messageNumber = 0;
+    for (BSPMessage message : readBundle.getMessages()) {
+      ByteMessage byteMessage = (ByteMessage) message;
+      assertTrue(Arrays.equals(testMessages[messageNumber].getTag(),
+          byteMessage.getTag()));
+      assertTrue(Arrays.equals(testMessages[messageNumber].getData(),
+          byteMessage.getData()));
+      ++messageNumber;
+    }
+    assertEquals(testMessages.length, messageNumber);
+  }
+}