You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by ed...@apache.org on 2012/02/08 13:29:23 UTC
svn commit: r1241883 - in /incubator/hama/trunk: ./ conf/ core/
core/src/main/java/org/apache/hama/bsp/message/
core/src/test/java/org/apache/hama/bsp/message/
Author: edwardyoon
Date: Wed Feb 8 12:29:23 2012
New Revision: 1241883
URL: http://svn.apache.org/viewvc?rev=1241883&view=rev
Log:
Add Avro RPC.
Added:
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
Modified:
incubator/hama/trunk/CHANGES.txt
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/core/pom.xml
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
incubator/hama/trunk/pom.xml
Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Wed Feb 8 12:29:23 2012
@@ -3,9 +3,10 @@ Hama Change Log
Release 0.5 - Unreleased
NEW FEATURES
-
+
+ HAMA-501: Add Avro RPC (tjungblut)
HAMA-456: Add basic Graph interfaces and GraphJobRunner (edwardyoon)
-
+
BUG FIXES
IMPROVEMENTS
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Wed Feb 8 12:29:23 2012
@@ -156,6 +156,11 @@
</property>
<property>
+ <name>hama.messanger.class</name>
+ <value>org.apache.hama.bsp.message.AvroMessageManagerImpl</value>
+ </property>
+
+ <property>
<name>hama.zookeeper.quorum</name>
<value>localhost</value>
<description>Comma separated list of servers in the ZooKeeper quorum.
Modified: incubator/hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/pom.xml (original)
+++ incubator/hama/trunk/core/pom.xml Wed Feb 8 12:29:23 2012
@@ -91,11 +91,30 @@
<artifactId>hadoop-test</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroBSPMessageBundle.java Wed Feb 8 12:29:23 2012
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.nio.ByteBuffer;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+
+public final class AvroBSPMessageBundle extends SpecificRecordBase implements
+ SpecificRecord {
+ public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema
+ .parse("{\"type\":\"record\",\"name\":\"AvroBSPMessage\",\"namespace\":\"de.jungblut.avro\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}");
+ @Deprecated
+ public java.nio.ByteBuffer data;
+
+ public final org.apache.avro.Schema getSchema() {
+ return SCHEMA$;
+ }
+
+ // Used by DatumWriter. Applications should not call.
+ public final java.lang.Object get(int field$) {
+ switch (field$) {
+ case 0:
+ return data;
+ default:
+ throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ // Used by DatumReader. Applications should not call.
+ public final void put(int field$, java.lang.Object value$) {
+ switch (field$) {
+ case 0:
+ data = (java.nio.ByteBuffer) value$;
+ break;
+ default:
+ throw new org.apache.avro.AvroRuntimeException("Bad index");
+ }
+ }
+
+ /**
+ * Gets the value of the 'data' field.
+ */
+ public final java.nio.ByteBuffer getData() {
+ return data;
+ }
+
+ /**
+ * Sets the value of the 'data' field.
+ *
+ * @param value the value to set.
+ */
+ public final void setData(java.nio.ByteBuffer value) {
+ this.data = value;
+ }
+
+ /** Creates a new AvroBSPMessage RecordBuilder */
+ public final static AvroBSPMessageBundle.Builder newBuilder() {
+ return new AvroBSPMessageBundle.Builder();
+ }
+
+ /** Creates a new AvroBSPMessage RecordBuilder by copying an existing Builder */
+ public final static AvroBSPMessageBundle.Builder newBuilder(
+ AvroBSPMessageBundle.Builder other) {
+ return new AvroBSPMessageBundle.Builder(other);
+ }
+
+ /**
+ * Creates a new AvroBSPMessage RecordBuilder by copying an existing
+ * AvroBSPMessage instance
+ */
+ public final static AvroBSPMessageBundle.Builder newBuilder(
+ AvroBSPMessageBundle other) {
+ return new AvroBSPMessageBundle.Builder(other);
+ }
+
+ /**
+ * RecordBuilder for AvroBSPMessage instances.
+ */
+ public final static class Builder extends
+ org.apache.avro.specific.SpecificRecordBuilderBase<AvroBSPMessageBundle>
+ implements org.apache.avro.data.RecordBuilder<AvroBSPMessageBundle> {
+
+ private java.nio.ByteBuffer data;
+
+ /** Creates a new Builder */
+ private Builder() {
+ super(AvroBSPMessageBundle.SCHEMA$);
+ }
+
+ /** Creates a Builder by copying an existing Builder */
+ private Builder(AvroBSPMessageBundle.Builder other) {
+ super(other);
+ }
+
+ /** Creates a Builder by copying an existing AvroBSPMessage instance */
+ private Builder(AvroBSPMessageBundle other) {
+ super(AvroBSPMessageBundle.SCHEMA$);
+ if (isValidValue(fields[0], other.data)) {
+ data = (java.nio.ByteBuffer) clone(other.data);
+ fieldSetFlags[0] = true;
+ }
+ }
+
+ public final ByteBuffer clone(ByteBuffer original) {
+ ByteBuffer clone = ByteBuffer.allocate(original.capacity());
+ original.rewind();
+ clone.put(original);
+ original.rewind();
+ clone.flip();
+ return clone;
+ }
+
+ /** Gets the value of the 'data' field */
+ public final java.nio.ByteBuffer getData() {
+ return data;
+ }
+
+ /** Sets the value of the 'data' field */
+ public final AvroBSPMessageBundle.Builder setData(java.nio.ByteBuffer value) {
+ validate(fields[0], value);
+ this.data = value;
+ fieldSetFlags[0] = true;
+ return this;
+ }
+
+ /** Checks whether the 'data' field has been set */
+ public final boolean hasData() {
+ return fieldSetFlags[0];
+ }
+
+ /** Clears the value of the 'data' field */
+ public final AvroBSPMessageBundle.Builder clearData() {
+ data = null;
+ fieldSetFlags[0] = false;
+ return this;
+ }
+
+ @Override
+ public final AvroBSPMessageBundle build() {
+ try {
+ AvroBSPMessageBundle record = new AvroBSPMessageBundle();
+ record.data = fieldSetFlags[0] ? this.data
+ : (java.nio.ByteBuffer) getDefaultValue(fields[0]);
+ return record;
+ } catch (Exception e) {
+ throw new org.apache.avro.AvroRuntimeException(e);
+ }
+ }
+ }
+}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/AvroMessageManagerImpl.java Wed Feb 8 12:29:23 2012
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hama.bsp.BSPMessage;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.util.BSPNetUtils;
+
+public class AvroMessageManagerImpl implements MessageManager, Sender {
+
+ private static final Log LOG = LogFactory
+ .getLog(AvroMessageManagerImpl.class);
+
+ private NettyServer server = null;
+
+ private final HashMap<InetSocketAddress, Sender> peers = new HashMap<InetSocketAddress, Sender>();
+ private final HashMap<String, InetSocketAddress> peerSocketCache = new HashMap<String, InetSocketAddress>();
+
+ private final HashMap<InetSocketAddress, LinkedList<BSPMessage>> outgoingQueues = new HashMap<InetSocketAddress, LinkedList<BSPMessage>>();
+ private Deque<BSPMessage> localQueue = new LinkedList<BSPMessage>();
+ // this must be a synchronized implementation: this is accessed per RPC
+ private final ConcurrentLinkedQueue<BSPMessage> localQueueForNextIteration = new ConcurrentLinkedQueue<BSPMessage>();
+
+ @Override
+ public void init(Configuration conf, InetSocketAddress addr) {
+ server = new NettyServer(new SpecificResponder(Sender.class, this), addr);
+ }
+
+ @Override
+ public void close() {
+ server.close();
+ }
+
+ @Override
+ public void clearOutgoingQueues() {
+ this.outgoingQueues.clear();
+ localQueue.addAll(localQueueForNextIteration);
+ localQueueForNextIteration.clear();
+ }
+
+ public void put(BSPMessageBundle messages) {
+ for (BSPMessage message : messages.getMessages()) {
+ this.localQueueForNextIteration.add(message);
+ }
+ }
+
+ @Override
+ public int getNumCurrentMessages() {
+ return localQueue.size();
+ }
+
+ @Override
+ public void transfer(InetSocketAddress addr, BSPMessageBundle bundle)
+ throws IOException {
+ AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+ msg.setData(serializeMessage(bundle));
+ Sender sender = peers.get(addr);
+
+ if (sender == null) {
+ NettyTransceiver client = new NettyTransceiver(addr);
+ sender = (Sender) SpecificRequestor.getClient(Sender.class, client);
+ peers.put(addr, sender);
+ }
+
+ sender.transfer(msg);
+ }
+
+ @Override
+ public Void transfer(AvroBSPMessageBundle messagebundle)
+ throws AvroRemoteException {
+ try {
+ BSPMessageBundle deserializeMessage = deserializeMessage(messagebundle
+ .getData());
+ this.put(deserializeMessage);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public BSPMessage getCurrentMessage() throws IOException {
+ return localQueue.poll();
+ }
+
+ @Override
+ public void send(String peerName, BSPMessage msg) throws IOException {
+ LOG.debug("Send message (" + msg.toString() + ") to " + peerName);
+ InetSocketAddress targetPeerAddress = null;
+ // Get socket for target peer.
+ if (peerSocketCache.containsKey(peerName)) {
+ targetPeerAddress = peerSocketCache.get(peerName);
+ } else {
+ targetPeerAddress = BSPNetUtils.getAddress(peerName);
+ peerSocketCache.put(peerName, targetPeerAddress);
+ }
+ LinkedList<BSPMessage> queue = outgoingQueues.get(targetPeerAddress);
+ if (queue == null) {
+ queue = new LinkedList<BSPMessage>();
+ }
+ queue.add(msg);
+ outgoingQueues.put(targetPeerAddress, queue);
+ }
+
+ private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
+ throws IOException {
+ BSPMessageBundle msg = new BSPMessageBundle();
+
+ ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+ DataInputStream in = new DataInputStream(inArray);
+ msg.readFields(in);
+
+ return msg;
+ }
+
+ private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
+ throws IOException {
+ ByteArrayOutputStream outArray = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outArray);
+ msg.write(out);
+ out.close();
+ System.out.println("serialized " + outArray.size() + " bytes");
+ return ByteBuffer.wrap(outArray.toByteArray());
+ }
+
+ @Override
+ public Iterator<Entry<InetSocketAddress, LinkedList<BSPMessage>>> getMessageIterator() {
+ return this.outgoingQueues.entrySet().iterator();
+ }
+}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Wed Feb 8 12:29:23 2012
@@ -34,7 +34,7 @@ public class MessageManagerFactory {
throws ClassNotFoundException {
return (MessageManager) ReflectionUtils.newInstance(conf
.getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
- org.apache.hama.bsp.message.HadoopMessageManagerImpl.class
+ org.apache.hama.bsp.message.AvroMessageManagerImpl.class
.getCanonicalName())), conf);
}
Added: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java (added)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/Sender.java Wed Feb 8 12:29:23 2012
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp.message;
+
+public interface Sender {
+ public static final org.apache.avro.Protocol PROTOCOL = org.apache.avro.Protocol
+ .parse("{\"protocol\":\"Sender\",\"namespace\":\"de.jungblut.avro\",\"types\":[{\"type\":\"record\",\"name\":\"AvroBSPMessageBundle\",\"fields\":[{\"name\":\"data\",\"type\":\"bytes\"}]}],\"messages\":{\"transfer\":{\"request\":[{\"name\":\"messagebundle\",\"type\":\"AvroBSPMessageBundle\"}],\"response\":\"null\"}}}");
+
+ java.lang.Void transfer(AvroBSPMessageBundle messagebundle)
+ throws org.apache.avro.AvroRemoteException;
+
+ @SuppressWarnings("all")
+ public interface Callback extends Sender {
+ public static final org.apache.avro.Protocol PROTOCOL = Sender.PROTOCOL;
+
+ void transfer(AvroBSPMessageBundle messagebundle,
+ org.apache.avro.ipc.Callback<java.lang.Void> callback)
+ throws java.io.IOException;
+ }
+}
Added: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1241883&view=auto
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (added)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Wed Feb 8 12:29:23 2012
@@ -0,0 +1,151 @@
+package org.apache.hama.bsp.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import org.apache.avro.AvroRemoteException;
+import org.apache.avro.ipc.NettyServer;
+import org.apache.avro.ipc.NettyTransceiver;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hama.bsp.BSPMessageBundle;
+import org.apache.hama.bsp.BooleanMessage;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.hama.bsp.IntegerMessage;
+
+public class TestAvroMessageManager {
+
+ private static NettyServer server;
+ private static Server hadoopServer;
+ private static long start;
+
+ public static final class MessageSender implements Sender {
+
+ @Override
+ public Void transfer(AvroBSPMessageBundle messagebundle)
+ throws AvroRemoteException {
+ try {
+ BSPMessageBundle msg = deserializeMessage(messagebundle.data);
+ System.out.println("Received message in "
+ + (System.currentTimeMillis() - start) + "ms");
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ }
+
+ private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
+ throws IOException {
+ BSPMessageBundle msg = new BSPMessageBundle();
+
+ ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
+ DataInputStream in = new DataInputStream(inArray);
+ msg.readFields(in);
+
+ return msg;
+ }
+
+ private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
+ throws IOException {
+ ByteArrayOutputStream outArray = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(outArray);
+ msg.write(out);
+ out.close();
+ System.out.println("serialized " + outArray.size() + " bytes");
+ return ByteBuffer.wrap(outArray.toByteArray());
+ }
+
+ public static final BSPMessageBundle getRandomBundle() {
+ BSPMessageBundle bundle = new BSPMessageBundle();
+
+ for (int i = 0; i < 500000; i++) {
+ bundle.addMessage(new IntegerMessage("test", i));
+ }
+
+ for (int i = 0; i < 10000; i++) {
+ bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
+ }
+
+ Random r = new Random();
+ for (int i = 0; i < 400000; i++) {
+ bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
+ }
+
+ return bundle;
+ }
+
+ public static final void main(String[] args) throws IOException {
+ BSPMessageBundle randomBundle = getRandomBundle();
+ testAvro(randomBundle);
+ testHadoop(randomBundle);
+ }
+
+ private static final void testAvro(BSPMessageBundle bundle)
+ throws IOException, AvroRemoteException {
+
+ server = new NettyServer(new SpecificResponder(Sender.class,
+ new MessageSender()), new InetSocketAddress(13530));
+
+ NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(server
+ .getPort()));
+ Sender proxy = (Sender) SpecificRequestor.getClient(Sender.class, client);
+
+ AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+
+ msg.setData(serializeMessage(bundle));
+
+ start = System.currentTimeMillis();
+ proxy.transfer(msg);
+
+ server.close();
+ client.close();
+ }
+
+ private static interface RPCTestInterface extends VersionedProtocol {
+
+ public void transfer(BSPMessageBundle bundle);
+
+ }
+
+ private static class HadoopRPCInstance implements RPCTestInterface {
+
+ @Override
+ public long getProtocolVersion(String arg0, long arg1) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public void transfer(BSPMessageBundle bundle) {
+ System.out.println("Received message in "
+ + (System.currentTimeMillis() - start) + "ms");
+ }
+
+ }
+
+ private static final void testHadoop(BSPMessageBundle bundle)
+ throws IOException {
+ Configuration conf = new Configuration();
+ HadoopRPCInstance hadoopRPCInstance = new HadoopRPCInstance();
+ hadoopServer = new Server(hadoopRPCInstance, conf, new InetSocketAddress(
+ 13612).getHostName(), 13612);
+ hadoopServer.start();
+ RPCTestInterface proxy = (RPCTestInterface) RPC.getProxy(
+ RPCTestInterface.class, 0, new InetSocketAddress(13612), conf);
+ start = System.currentTimeMillis();
+ proxy.transfer(bundle);
+ hadoopServer.stop();
+ }
+
+}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Wed Feb 8 12:29:23 2012
@@ -34,6 +34,7 @@ public class TestHadoopMessageManager ex
public void testMessaging() throws Exception {
Configuration conf = new Configuration();
+ conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
MessageManager messageManager = MessageManagerFactory
.getMessageManager(conf);
Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1241883&r1=1241882&r2=1241883&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Wed Feb 8 12:29:23 2012
@@ -185,6 +185,32 @@
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>1.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-ipc</artifactId>
+ <version>1.6.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.2.6.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>1.9.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>1.9.2</version>
+ </dependency>
+
</dependencies>
</dependencyManagement>