You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/02/08 13:29:23 UTC

svn commit: r1241883 - in /incubator/hama/trunk: ./ conf/ core/ core/src/main/java/org/apache/hama/bsp/message/ core/src/test/java/org/apache/hama/bsp/message/

Author: edwardyoon
Date: Wed Feb  8 12:29:23 2012
New Revision: 1241883

URL: http://svn.apache.org/viewvc?rev=1241883&view=rev
Log:
Add Avro RPC.

Added:
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/pom.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
    incubator/hama/trunk/pom.xml

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Feb  8 12:29:23 2012
@@ -3,9 +3,10 @@ Hama Change Log
 Release 0.5 - Unreleased
 
   NEW FEATURES
-  
+
+   HAMA-501: Add Avro RPC (tjungblut)
    HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
-  
+
   BUG FIXES
 
   IMPROVEMENTS

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Wed Feb  8 12:29:23 2012
@@ -156,6 +156,11 @@
   </property>
   
   <property>
+    <name>hama.messanger.class</name>
+    <value>org.apache.hama.bsp.message.AvroMessageManagerImpl</value>
+  </property>
+  
+  <property>
     <name>hama.zookeeper.quorum</name>
     <value>localhost</value>
     <description>Comma separated list of servers in the ZooKeeper quorum.

Modified: incubator/hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/pom.xml (original)
+++ incubator/hama/trunk/core/pom.xml Wed Feb  8 12:29:23 2012
@@ -91,11 +91,30 @@
       <artifactId>hadoop-test</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.avro</groupId>
+      <artifactId>avro-ipc</artifactId>
+    </dependency>
+      <dependency>
+        <groupId>org.jboss.netty</groupId>
+        <artifactId>netty</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-core-asl</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.jackson</groupId>
+      <artifactId>jackson-mapper-asl</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Wed Feb  8 12:29:23 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+public final class AvroBSPMessageBundle extends SpecificRecordBase implements
+    SpecificRecord {
+  public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
+      .parse("{\"type\":\"record\",\"name\":\"AvroBSPMessage\",\"namespace\":\"de.jungblut.avro\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}");
+  @Deprecated
+  public java.nio.ByteBuffer data;
+
+  public final org.apache.avro.Schema getSchema() {
+    return SCHEMA$;
+  }
+
+  // Used by DatumWriter. Applications should not call.
+  public final java.lang.Object get(int field$) {
+    switch (field$) {
+      case 0:
+        return data;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  // Used by DatumReader. Applications should not call.
+  public final void put(int field$, java.lang.Object value$) {
+    switch (field$) {
+      case 0:
+        data = (java.nio.ByteBuffer) value$;
+        break;
+      default:
+        throw new org.apache.avro.AvroRuntimeException("Bad index");
+    }
+  }
+
+  /**
+   * Gets the value of the 'data' field.
+   */
+  public final java.nio.ByteBuffer getData() {
+    return data;
+  }
+
+  /**
+   * Sets the value of the 'data' field.
+   * 
+   * @param value the value to set.
+   */
+  public final void setData(java.nio.ByteBuffer value) {
+    this.data = value;
+  }
+
+  /** Creates a new AvroBSPMessage RecordBuilder */
+  public final static AvroBSPMessageBundle.Builder newBuilder() {
+    return new AvroBSPMessageBundle.Builder();
+  }
+
+  /** Creates a new AvroBSPMessage RecordBuilder by copying an existing Builder */
+  public final static AvroBSPMessageBundle.Builder newBuilder(
+      AvroBSPMessageBundle.Builder other) {
+    return new AvroBSPMessageBundle.Builder(other);
+  }
+
+  /**
+   * Creates a new AvroBSPMessage RecordBuilder by copying an existing
+   * AvroBSPMessage instance
+   */
+  public final static AvroBSPMessageBundle.Builder newBuilder(
+      AvroBSPMessageBundle other) {
+    return new AvroBSPMessageBundle.Builder(other);
+  }
+
+  /**
+   * RecordBuilder for AvroBSPMessage instances.
+   */
+  public final static class Builder extends
+      org.apache.avro.specific.SpecificRecordBuilderBase<AvroBSPMessageBundle>
+      implements org.apache.avro.data.RecordBuilder<AvroBSPMessageBundle> {
+
+    private java.nio.ByteBuffer data;
+
+    /** Creates a new Builder */
+    private Builder() {
+      super(AvroBSPMessageBundle.SCHEMA$);
+    }
+
+    /** Creates a Builder by copying an existing Builder */
+    private Builder(AvroBSPMessageBundle.Builder other) {
+      super(other);
+    }
+
+    /** Creates a Builder by copying an existing AvroBSPMessage instance */
+    private Builder(AvroBSPMessageBundle other) {
+      super(AvroBSPMessageBundle.SCHEMA$);
+      if (isValidValue(fields[0], other.data)) {
+        data = (java.nio.ByteBuffer) clone(other.data);
+        fieldSetFlags[0] = true;
+      }
+    }
+
+    public final ByteBuffer clone(ByteBuffer original) {
+      ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+      original.rewind();
+      clone.put(original);
+      original.rewind();
+      clone.flip();
+      return clone;
+    }
+
+    /** Gets the value of the 'data' field */
+    public final java.nio.ByteBuffer getData() {
+      return data;
+    }
+
+    /** Sets the value of the 'data' field */
+    public final AvroBSPMessageBundle.Builder setData(java.nio.ByteBuffer value) {
+      validate(fields[0], value);
+      this.data = value;
+      fieldSetFlags[0] = true;
+      return this;
+    }
+
+    /** Checks whether the 'data' field has been set */
+    public final boolean hasData() {
+      return fieldSetFlags[0];
+    }
+
+    /** Clears the value of the 'data' field */
+    public final AvroBSPMessageBundle.Builder clearData() {
+      data = null;
+      fieldSetFlags[0] = false;
+      return this;
+    }
+
+    @Override
+    public final AvroBSPMessageBundle build() {
+      try {
+        AvroBSPMessageBundle record = new AvroBSPMessageBundle();
+        record.data = fieldSetFlags[0] ? this.data
+            : (java.nio.ByteBuffer) getDefaultValue(fields[0]);
+        return record;
+      } catch (Exception e) {
+        throw new org.apache.avro.AvroRuntimeException(e);
+      }
+    }
+  }
+}

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Wed Feb  8 12:29:23 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.util.BSPNetUtils;
+
+public class AvroMessageManagerImpl implements MessageManager, Sender {
+
+  private static final Log LOG = LogFactory
+      .getLog(AvroMessageManagerImpl.class);
+
+  private NettyServer server = null;
+
+  private final HashMap<InetSocketAddress, Sender> peers = new HashMap<InetSocketAddress, Sender>();
+  private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+
+  private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
+  private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>();
+  // this must be a synchronized implementation: this is accessed per RPC
+  private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+
+  @Override
+  public void init(Configuration conf, InetSocketAddress addr) {
+    server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
+  }
+
+  @Override
+  public void close() {
+    server.close();
+  }
+
+  @Override
+  public void clearOutgoingQueues() {
+    this.outgoingQueues.clear();
+    localQueue.addAll(localQueueForNextIteration);
+    localQueueForNextIteration.clear();
+  }
+
+  public void put(BSPMessageBundle messages) {
+    for (BSPMessage message : messages.getMessages()) {
+      this.localQueueForNextIteration.add(message);
+    }
+  }
+
+  @Override
+  public int getNumCurrentMessages() {
+    return localQueue.size();
+  }
+
+  @Override
+  public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+      throws IOException {
+    AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+    msg.setData(serializeMessage(bundle));
+    Sender sender = peers.get(addr);
+
+    if (sender == null) {
+      NettyTransceiver client = new NettyTransceiver(addr);
+      sender = (Sender) SpecificRequestor.getClient(Sender.class, client);
+      peers.put(addr, sender);
+    }
+    
+    sender.transfer(msg);
+  }
+
+  @Override
+  public Void transfer(AvroBSPMessageBundle messagebundle)
+      throws AvroRemoteException {
+    try {
+      BSPMessageBundle deserializeMessage = deserializeMessage(messagebundle
+          .getData());
+      this.put(deserializeMessage);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+    return null;
+  }
+
+  @Override
+  public BSPMessage getCurrentMessage() throws IOException {
+    return localQueue.poll();
+  }
+
+  @Override
+  public void send(String peerName, BSPMessage msg) throws IOException {
+    LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
+    InetSocketAddress targetPeerAddress = null;
+    // Get socket for target peer.
+    if (peerSocketCache.containsKey(peerName)) {
+      targetPeerAddress = peerSocketCache.get(peerName);
+    } else {
+      targetPeerAddress = BSPNetUtils.getAddress(peerName);
+      peerSocketCache.put(peerName, targetPeerAddress);
+    }
+    LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+    if (queue == null) {
+      queue = new LinkedList<BSPMessage>();
+    }
+    queue.add(msg);
+    outgoingQueues.put(targetPeerAddress, queue);
+  }
+
+  private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
+      throws IOException {
+    BSPMessageBundle msg = new BSPMessageBundle();
+
+    ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+    DataInputStream in = new DataInputStream(inArray);
+    msg.readFields(in);
+
+    return msg;
+  }
+
+  private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
+      throws IOException {
+    ByteArrayOutputStream outArray = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(outArray);
+    msg.write(out);
+    out.close();
+    System.out.println("serialized " + outArray.size() + " bytes");
+    return ByteBuffer.wrap(outArray.toByteArray());
+  }
+
+  @Override
+  public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
+    return this.outgoingQueues.entrySet().iterator();
+  }
+}

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Wed Feb  8 12:29:23 2012
@@ -34,7 +34,7 @@ public class MessageManagerFactory {
       throws ClassNotFoundException {
     return (MessageManager) ReflectionUtils.newInstance(conf
         .getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
-            org.apache.hama.bsp.message.HadoopMessageManagerImpl.class
+            org.apache.hama.bsp.message.AvroMessageManagerImpl.class
                 .getCanonicalName())), conf);
   }
 

Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Wed Feb  8 12:29:23 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface Sender {
+  public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol
+      .parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}");
+
+  java.lang.Void transfer(AvroBSPMessageBundle messagebundle)
+      throws org.apache.avro.AvroRemoteException;
+
+  @SuppressWarnings("all")
+  public interface Callback extends Sender {
+    public static final org.apache.avro.Protocol PROTOCOL = Sender.PROTOCOL;
+
+    void transfer(AvroBSPMessageBundle messagebundle,
+        org.apache.avro.ipc.Callback<java.lang.Void> callback)
+        throws java.io.IOException;
+  }
+}

Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Wed Feb  8 12:29:23 2012
@@ -0,0 +1,151 @@
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BooleanMessage;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.hama.bsp.IntegerMessage;
+
+public class TestAvroMessageManager {
+
+  private static NettyServer server;
+  private static Server hadoopServer;
+  private static long start;
+
+  public static final class MessageSender implements Sender {
+
+    @Override
+    public Void transfer(AvroBSPMessageBundle messagebundle)
+        throws AvroRemoteException {
+      try {
+        BSPMessageBundle msg = deserializeMessage(messagebundle.data);
+        System.out.println("Received message in "
+            + (System.currentTimeMillis() - start) + "ms");
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return null;
+    }
+
+  }
+
+  private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
+      throws IOException {
+    BSPMessageBundle msg = new BSPMessageBundle();
+
+    ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+    DataInputStream in = new DataInputStream(inArray);
+    msg.readFields(in);
+
+    return msg;
+  }
+
+  private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
+      throws IOException {
+    ByteArrayOutputStream outArray = new ByteArrayOutputStream();
+    DataOutputStream out = new DataOutputStream(outArray);
+    msg.write(out);
+    out.close();
+    System.out.println("serialized " + outArray.size() + " bytes");
+    return ByteBuffer.wrap(outArray.toByteArray());
+  }
+
+  public static final BSPMessageBundle getRandomBundle() {
+    BSPMessageBundle bundle = new BSPMessageBundle();
+
+    for (int i = 0; i < 500000; i++) {
+      bundle.addMessage(new IntegerMessage("test", i));
+    }
+
+    for (int i = 0; i < 10000; i++) {
+      bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
+    }
+
+    Random r = new Random();
+    for (int i = 0; i < 400000; i++) {
+      bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
+    }
+
+    return bundle;
+  }
+
+  public static final void main(String[] args) throws IOException {
+    BSPMessageBundle randomBundle = getRandomBundle();
+    testAvro(randomBundle);
+    testHadoop(randomBundle);
+  }
+
+  private static final void testAvro(BSPMessageBundle bundle)
+      throws IOException, AvroRemoteException {
+
+    server = new NettyServer(new SpecificResponder(Sender.class,
+        new MessageSender()), new InetSocketAddress(13530));
+
+    NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(server
+        .getPort()));
+    Sender proxy = (Sender) SpecificRequestor.getClient(Sender.class, client);
+
+    AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+
+    msg.setData(serializeMessage(bundle));
+
+    start = System.currentTimeMillis();
+    proxy.transfer(msg);
+
+    server.close();
+    client.close();
+  }
+
+  private static interface RPCTestInterface extends VersionedProtocol {
+
+    public void transfer(BSPMessageBundle bundle);
+
+  }
+
+  private static class HadoopRPCInstance implements RPCTestInterface {
+
+    @Override
+    public long getProtocolVersion(String arg0, long arg1) throws IOException {
+      return 0;
+    }
+
+    @Override
+    public void transfer(BSPMessageBundle bundle) {
+      System.out.println("Received message in "
+          + (System.currentTimeMillis() - start) + "ms");
+    }
+
+  }
+
+  private static final void testHadoop(BSPMessageBundle bundle)
+      throws IOException {
+    Configuration conf = new Configuration();
+    HadoopRPCInstance hadoopRPCInstance = new HadoopRPCInstance();
+    hadoopServer = new Server(hadoopRPCInstance, conf, new InetSocketAddress(
+        13612).getHostName(), 13612);
+    hadoopServer.start();
+    RPCTestInterface proxy = (RPCTestInterface) RPC.getProxy(
+        RPCTestInterface.class, 0, new InetSocketAddress(13612), conf);
+    start = System.currentTimeMillis();
+    proxy.transfer(bundle);
+    hadoopServer.stop();
+  }
+
+}

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Wed Feb  8 12:29:23 2012
@@ -34,6 +34,7 @@ public class TestHadoopMessageManager ex
 
   public void testMessaging() throws Exception {
     Configuration conf = new Configuration();
+    conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
     MessageManager messageManager = MessageManagerFactory
         .getMessageManager(conf);
 

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Wed Feb  8 12:29:23 2012
@@ -185,6 +185,32 @@
         <artifactId>zookeeper</artifactId>
         <version>${zookeeper.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro</artifactId>
+        <version>1.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.avro</groupId>
+        <artifactId>avro-ipc</artifactId>
+        <version>1.6.0</version>
+      </dependency>
+      <dependency>
+        <groupId>org.jboss.netty</groupId>
+        <artifactId>netty</artifactId>
+        <version>3.2.6.Final</version>
+      </dependency>
+      <dependency>
+        <groupId>org.codehaus.jackson</groupId>
+        <artifactId>jackson-core-asl</artifactId>
+        <version>1.9.2</version>
+      </dependency>
+      <dependency>
+        <groupId>org.codehaus.jackson</groupId>
+        <artifactId>jackson-mapper-asl</artifactId>
+        <version>1.9.2</version>
+      </dependency>
+         
     </dependencies>
   </dependencyManagement>