You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/02/23 17:55:49 UTC
svn commit: r1292852 - in /incubator/hama/trunk: ./ conf/
core/src/main/java/org/apache/hama/bsp/
core/src/main/java/org/apache/hama/bsp/message/
core/src/main/java/org/apache/hama/bsp/message/compress/
core/src/test/java/org/apache/hama/bsp/message/ c...
Author: tjungblut
Date: Thu Feb 23 16:55:48 2012
New Revision: 1292852
URL: http://svn.apache.org/viewvc?rev=1292852&view=rev
Log:
- reworked avro test
- compression is now defaulted in xml conf rather than source code
- switched release signing to deploy phase
Modified:
incubator/hama/trunk/conf/hama-default.xml
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
incubator/hama/trunk/pom.xml
Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Thu Feb 23 16:55:48 2012
@@ -89,6 +89,11 @@
</description>
</property>
<property>
+ <name>hama.messenger.compression.class</name>
+ <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+ <description>The message compression algorithm to choose.</description>
+ </property>
+ <property>
<name>bsp.local.tasks.maximum</name>
<value>10</value>
<description>Number of tasks that run in parallel when in local mode.</description>
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 23 16:55:48 2012
@@ -141,7 +141,7 @@ public final class BSPPeerImpl<K1, V1, K
TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
TaskStatus.Phase.STARTING, counters));
- messenger = new MessageManagerFactory<M>().getMessageManager(conf);
+ messenger = MessageManagerFactory.getMessageManager(conf);
messenger.init(conf, peerAddress);
}
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Thu Feb 23 16:55:48 2012
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
-public class MessageManagerFactory<M extends Writable> {
+public class MessageManagerFactory {
public static final String MESSAGE_MANAGER_CLASS = "hama.messenger.class";
/**
@@ -31,8 +31,8 @@ public class MessageManagerFactory<M ext
* @return
*/
@SuppressWarnings("unchecked")
- public MessageManager<M> getMessageManager(Configuration conf)
- throws ClassNotFoundException {
+ public static <M extends Writable> MessageManager<M> getMessageManager(
+ Configuration conf) throws ClassNotFoundException {
return (MessageManager<M>) ReflectionUtils.newInstance(conf
.getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
org.apache.hama.bsp.message.AvroMessageManagerImpl.class
Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java Thu Feb 23 16:55:48 2012
@@ -33,12 +33,14 @@ public class BSPMessageCompressorFactory
*/
@SuppressWarnings("unchecked")
public BSPMessageCompressor<M> getCompressor(Configuration conf) {
- try {
- return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
- .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
- SnappyCompressor.class.getCanonicalName())), conf);
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
+ if (conf.get(COMPRESSION_CODEC_CLASS) != null) {
+ try {
+ return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
+ .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
+ SnappyCompressor.class.getCanonicalName())), conf);
+ } catch (ClassNotFoundException e) {
+ e.printStackTrace();
+ }
}
return null;
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Thu Feb 23 16:55:48 2012
@@ -17,155 +17,82 @@
*/
package org.apache.hama.bsp.message;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.Random;
import junit.framework.TestCase;
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.io.Writable;
import org.apache.hama.bsp.BSPMessageBundle;
import org.apache.hama.bsp.BooleanMessage;
import org.apache.hama.bsp.DoubleMessage;
import org.apache.hama.bsp.IntegerMessage;
+import org.apache.hama.util.BSPNetUtils;
public class TestAvroMessageManager extends TestCase {
- private static NettyServer server;
- private static Server hadoopServer;
- private static long start;
-
- public void compareMessengers() throws Exception {
- BSPMessageBundle randomBundle = getRandomBundle();
- testAvro(randomBundle);
- testHadoop(randomBundle);
- }
+ private static final int DOUBLE_MSG_COUNT = 400000;
+ private static final int BOOL_MSG_COUNT = 10000;
+ private static final int INT_MSG_COUNT = 500000;
- public static final class MessageSender implements Sender {
+ private static final int SUM = DOUBLE_MSG_COUNT + BOOL_MSG_COUNT
+ + INT_MSG_COUNT;
- @Override
- public Void transfer(AvroBSPMessageBundle messagebundle)
- throws AvroRemoteException {
- try {
- BSPMessageBundle msg = deserializeMessage(messagebundle.getData());
- System.out.println("Received message in "
- + (System.currentTimeMillis() - start) + "ms. Size: "
- + msg.getMessages().size());
- } catch (IOException e) {
- e.printStackTrace();
- }
- return null;
- }
+ public void testAvroMessenger() throws Exception {
+ BSPMessageBundle<Writable> randomBundle = getRandomBundle();
+ Configuration conf = new Configuration();
+ MessageManager<Writable> messageManager = MessageManagerFactory
+ .getMessageManager(conf);
- }
+ assertTrue(messageManager instanceof AvroMessageManagerImpl);
- private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
- throws IOException {
- BSPMessageBundle msg = new BSPMessageBundle();
-
- ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
- DataInputStream in = new DataInputStream(inArray);
- msg.readFields(in);
+ InetSocketAddress peer = new InetSocketAddress(
+ BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
+ messageManager.init(conf, peer);
- return msg;
- }
+ messageManager.transfer(peer, randomBundle);
- private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
- throws IOException {
- ByteArrayOutputStream outArray = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(outArray);
- msg.write(out);
- out.close();
- System.out.println("serialized " + outArray.size() + " bytes");
- return ByteBuffer.wrap(outArray.toByteArray());
- }
+ messageManager.clearOutgoingQueues();
- public static final BSPMessageBundle getRandomBundle() {
- BSPMessageBundle bundle = new BSPMessageBundle();
+ assertEquals(SUM, messageManager.getNumCurrentMessages());
- for (int i = 0; i < 500000; i++) {
- bundle.addMessage(new IntegerMessage("test", i));
- }
+ int numIntMsgs = 0, numBoolMsgs = 0, numDoubleMsgs = 0;
- for (int i = 0; i < 10000; i++) {
- bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
+ Writable msg = null;
+ while ((msg = messageManager.getCurrentMessage()) != null) {
+ if (msg instanceof IntegerMessage) {
+ numIntMsgs++;
+ } else if (msg instanceof BooleanMessage) {
+ numBoolMsgs++;
+ } else if (msg instanceof DoubleMessage) {
+ numDoubleMsgs++;
+ }
}
- Random r = new Random();
- for (int i = 0; i < 400000; i++) {
- bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
- }
+ assertEquals(INT_MSG_COUNT, numIntMsgs);
+ assertEquals(BOOL_MSG_COUNT, numBoolMsgs);
+ assertEquals(DOUBLE_MSG_COUNT, numDoubleMsgs);
- return bundle;
}
- private static final void testAvro(BSPMessageBundle bundle)
- throws IOException, AvroRemoteException {
-
- server = new NettyServer(new SpecificResponder(Sender.class,
- new MessageSender()), new InetSocketAddress(13530));
-
- NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(
- server.getPort()));
- Sender proxy = (Sender) SpecificRequestor.getClient(Sender.class, client);
-
- AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+ public final BSPMessageBundle<Writable> getRandomBundle() {
+ BSPMessageBundle<Writable> bundle = new BSPMessageBundle<Writable>();
- msg.setData(serializeMessage(bundle));
-
- start = System.currentTimeMillis();
- proxy.transfer(msg);
-
- server.close();
- client.close();
- }
-
- private static interface RPCTestInterface extends VersionedProtocol {
-
- public void transfer(BSPMessageBundle bundle);
-
- }
-
- private static class HadoopRPCInstance implements RPCTestInterface {
-
- @Override
- public long getProtocolVersion(String arg0, long arg1) throws IOException {
- return 0;
+ for (int i = 0; i < INT_MSG_COUNT; i++) {
+ bundle.addMessage(new IntegerMessage("test", i));
}
- @Override
- public void transfer(BSPMessageBundle bundle) {
- System.out.println("Received message in "
- + (System.currentTimeMillis() - start) + "ms");
+ for (int i = 0; i < BOOL_MSG_COUNT; i++) {
+ bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
}
- }
+ Random r = new Random();
+ for (int i = 0; i < DOUBLE_MSG_COUNT; i++) {
+ bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
+ }
- private static final void testHadoop(BSPMessageBundle bundle)
- throws IOException {
- Configuration conf = new Configuration();
- HadoopRPCInstance hadoopRPCInstance = new HadoopRPCInstance();
- hadoopServer = new Server(hadoopRPCInstance, conf, new InetSocketAddress(
- 13612).getHostName(), 13612);
- hadoopServer.start();
- RPCTestInterface proxy = (RPCTestInterface) RPC.getProxy(
- RPCTestInterface.class, 0, new InetSocketAddress(13612), conf);
- start = System.currentTimeMillis();
- proxy.transfer(bundle);
- hadoopServer.stop();
+ return bundle;
}
}
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Thu Feb 23 16:55:48 2012
@@ -33,8 +33,9 @@ public class TestHadoopMessageManager ex
public void testMessaging() throws Exception {
Configuration conf = new Configuration();
- conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
- MessageManager<IntWritable> messageManager = new MessageManagerFactory<IntWritable>()
+ conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
+ "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
+ MessageManager<IntWritable> messageManager = MessageManagerFactory
.getMessageManager(conf);
assertTrue(messageManager instanceof HadoopMessageManagerImpl);
@@ -62,7 +63,7 @@ public class TestHadoopMessageManager ex
}
messageManager.transfer(peer, bundle);
-
+
messageManager.clearOutgoingQueues();
assertTrue(messageManager.getNumCurrentMessages() == 1);
Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Thu Feb 23 16:55:48 2012
@@ -27,8 +27,17 @@ import org.apache.hama.bsp.IntegerMessag
public class TestBSPMessageCompressor extends TestCase {
public void testCompression() {
+ Configuration configuration = new Configuration();
BSPMessageCompressor<IntegerMessage> compressor = new BSPMessageCompressorFactory<IntegerMessage>()
- .getCompressor(new Configuration());
+ .getCompressor(configuration);
+
+ assertNull(compressor);
+ configuration.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
+ SnappyCompressor.class, BSPMessageCompressor.class);
+ compressor = new BSPMessageCompressorFactory<IntegerMessage>()
+ .getCompressor(configuration);
+
+ assertNotNull(compressor);
int n = 20;
BSPMessageBundle<IntegerMessage> bundle = new BSPMessageBundle<IntegerMessage>();
Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Thu Feb 23 16:55:48 2012
@@ -313,7 +313,7 @@
<executions>
<execution>
<id>sign-artifacts</id>
- <phase>verify</phase>
+ <phase>deploy</phase>
<goals>
<goal>sign</goal>
</goals>