You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hama.apache.org by tj...@apache.org on 2012/02/23 17:55:49 UTC

svn commit: r1292852 - in /incubator/hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/message/ core/src/main/java/org/apache/hama/bsp/message/compress/ core/src/test/java/org/apache/hama/bsp/message/ c...

Author: tjungblut
Date: Thu Feb 23 16:55:48 2012
New Revision: 1292852

URL: http://svn.apache.org/viewvc?rev=1292852&view=rev
Log:
- reworked avro test
- compression is now defaulted in xml conf rather than source code
- switched release signing to deploy phase


Modified:
    incubator/hama/trunk/conf/hama-default.xml
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
    incubator/hama/trunk/pom.xml

Modified: incubator/hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/conf/hama-default.xml?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/conf/hama-default.xml (original)
+++ incubator/hama/trunk/conf/hama-default.xml Thu Feb 23 16:55:48 2012
@@ -89,6 +89,11 @@
     </description>
   </property>
   <property>
+    <name>hama.messenger.compression.class</name>
+    <value>org.apache.hama.bsp.message.compress.SnappyCompressor</value>
+    <description>The message compression algorithm to choose.</description>
+  </property>
+  <property>
     <name>bsp.local.tasks.maximum</name>
     <value>10</value>
     <description>Number of tasks that run in parallel when in local mode.</description>

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Feb 23 16:55:48 2012
@@ -141,7 +141,7 @@ public final class BSPPeerImpl<K1, V1, K
         TaskStatus.State.RUNNING, "running", peerAddress.getHostName(),
         TaskStatus.Phase.STARTING, counters));
 
-    messenger = new MessageManagerFactory<M>().getMessageManager(conf);
+    messenger = MessageManagerFactory.getMessageManager(conf);
     messenger.init(conf, peerAddress);
 
   }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/MessageManagerFactory.java Thu Feb 23 16:55:48 2012
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
 
-public class MessageManagerFactory<M extends Writable> {
+public class MessageManagerFactory {
   public static final String MESSAGE_MANAGER_CLASS = "hama.messenger.class";
 
   /**
@@ -31,8 +31,8 @@ public class MessageManagerFactory<M ext
    * @return
    */
   @SuppressWarnings("unchecked")
-  public MessageManager<M> getMessageManager(Configuration conf)
-      throws ClassNotFoundException {
+  public static <M extends Writable> MessageManager<M> getMessageManager(
+      Configuration conf) throws ClassNotFoundException {
     return (MessageManager<M>) ReflectionUtils.newInstance(conf
         .getClassByName(conf.get(MESSAGE_MANAGER_CLASS,
             org.apache.hama.bsp.message.AvroMessageManagerImpl.class

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/message/compress/BSPMessageCompressorFactory.java Thu Feb 23 16:55:48 2012
@@ -33,12 +33,14 @@ public class BSPMessageCompressorFactory
    */
   @SuppressWarnings("unchecked")
   public BSPMessageCompressor<M> getCompressor(Configuration conf) {
-    try {
-      return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
-          .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
-              SnappyCompressor.class.getCanonicalName())), conf);
-    } catch (ClassNotFoundException e) {
-      e.printStackTrace();
+    if (conf.get(COMPRESSION_CODEC_CLASS) != null) {
+      try {
+        return (BSPMessageCompressor<M>) ReflectionUtils.newInstance(conf
+            .getClassByName(conf.get(COMPRESSION_CODEC_CLASS,
+                SnappyCompressor.class.getCanonicalName())), conf);
+      } catch (ClassNotFoundException e) {
+        e.printStackTrace();
+      }
     }
     return null;
   }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestAvroMessageManager.java Thu Feb 23 16:55:48 2012
@@ -17,155 +17,82 @@
  */
 package org.apache.hama.bsp.message;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.Random;
 
 import junit.framework.TestCase;
 
-import org.apache.avro.AvroRemoteException;
-import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.NettyTransceiver;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RPC.Server;
-import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPMessageBundle;
 import org.apache.hama.bsp.BooleanMessage;
 import org.apache.hama.bsp.DoubleMessage;
 import org.apache.hama.bsp.IntegerMessage;
+import org.apache.hama.util.BSPNetUtils;
 
 public class TestAvroMessageManager extends TestCase {
 
-  private static NettyServer server;
-  private static Server hadoopServer;
-  private static long start;
-
-  public void compareMessengers() throws Exception {
-    BSPMessageBundle randomBundle = getRandomBundle();
-    testAvro(randomBundle);
-    testHadoop(randomBundle);
-  }
+  private static final int DOUBLE_MSG_COUNT = 400000;
+  private static final int BOOL_MSG_COUNT = 10000;
+  private static final int INT_MSG_COUNT = 500000;
 
-  public static final class MessageSender implements Sender {
+  private static final int SUM = DOUBLE_MSG_COUNT + BOOL_MSG_COUNT
+      + INT_MSG_COUNT;
 
-    @Override
-    public Void transfer(AvroBSPMessageBundle messagebundle)
-        throws AvroRemoteException {
-      try {
-        BSPMessageBundle msg = deserializeMessage(messagebundle.getData());
-        System.out.println("Received message in "
-            + (System.currentTimeMillis() - start) + "ms. Size: "
-            + msg.getMessages().size());
-      } catch (IOException e) {
-        e.printStackTrace();
-      }
-      return null;
-    }
+  public void testAvroMessenger() throws Exception {
+    BSPMessageBundle<Writable> randomBundle = getRandomBundle();
+    Configuration conf = new Configuration();
+    MessageManager<Writable> messageManager = MessageManagerFactory
+        .getMessageManager(conf);
 
-  }
+    assertTrue(messageManager instanceof AvroMessageManagerImpl);
 
-  private static final BSPMessageBundle deserializeMessage(ByteBuffer buffer)
-      throws IOException {
-    BSPMessageBundle msg = new BSPMessageBundle();
-
-    ByteArrayInputStream inArray = new ByteArrayInputStream(buffer.array());
-    DataInputStream in = new DataInputStream(inArray);
-    msg.readFields(in);
+    InetSocketAddress peer = new InetSocketAddress(
+        BSPNetUtils.getCanonicalHostname(), BSPNetUtils.getFreePort());
+    messageManager.init(conf, peer);
 
-    return msg;
-  }
+    messageManager.transfer(peer, randomBundle);
 
-  private static final ByteBuffer serializeMessage(BSPMessageBundle msg)
-      throws IOException {
-    ByteArrayOutputStream outArray = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(outArray);
-    msg.write(out);
-    out.close();
-    System.out.println("serialized " + outArray.size() + " bytes");
-    return ByteBuffer.wrap(outArray.toByteArray());
-  }
+    messageManager.clearOutgoingQueues();
 
-  public static final BSPMessageBundle getRandomBundle() {
-    BSPMessageBundle bundle = new BSPMessageBundle();
+    assertEquals(SUM, messageManager.getNumCurrentMessages());
 
-    for (int i = 0; i < 500000; i++) {
-      bundle.addMessage(new IntegerMessage("test", i));
-    }
+    int numIntMsgs = 0, numBoolMsgs = 0, numDoubleMsgs = 0;
 
-    for (int i = 0; i < 10000; i++) {
-      bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
+    Writable msg = null;
+    while ((msg = messageManager.getCurrentMessage()) != null) {
+      if (msg instanceof IntegerMessage) {
+        numIntMsgs++;
+      } else if (msg instanceof BooleanMessage) {
+        numBoolMsgs++;
+      } else if (msg instanceof DoubleMessage) {
+        numDoubleMsgs++;
+      }
     }
 
-    Random r = new Random();
-    for (int i = 0; i < 400000; i++) {
-      bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
-    }
+    assertEquals(INT_MSG_COUNT, numIntMsgs);
+    assertEquals(BOOL_MSG_COUNT, numBoolMsgs);
+    assertEquals(DOUBLE_MSG_COUNT, numDoubleMsgs);
 
-    return bundle;
   }
 
-  private static final void testAvro(BSPMessageBundle bundle)
-      throws IOException, AvroRemoteException {
-
-    server = new NettyServer(new SpecificResponder(Sender.class,
-        new MessageSender()), new InetSocketAddress(13530));
-
-    NettyTransceiver client = new NettyTransceiver(new InetSocketAddress(
-        server.getPort()));
-    Sender proxy = (Sender) SpecificRequestor.getClient(Sender.class, client);
-
-    AvroBSPMessageBundle msg = new AvroBSPMessageBundle();
+  public final BSPMessageBundle<Writable> getRandomBundle() {
+    BSPMessageBundle<Writable> bundle = new BSPMessageBundle<Writable>();
 
-    msg.setData(serializeMessage(bundle));
-
-    start = System.currentTimeMillis();
-    proxy.transfer(msg);
-
-    server.close();
-    client.close();
-  }
-
-  private static interface RPCTestInterface extends VersionedProtocol {
-
-    public void transfer(BSPMessageBundle bundle);
-
-  }
-
-  private static class HadoopRPCInstance implements RPCTestInterface {
-
-    @Override
-    public long getProtocolVersion(String arg0, long arg1) throws IOException {
-      return 0;
+    for (int i = 0; i < INT_MSG_COUNT; i++) {
+      bundle.addMessage(new IntegerMessage("test", i));
     }
 
-    @Override
-    public void transfer(BSPMessageBundle bundle) {
-      System.out.println("Received message in "
-          + (System.currentTimeMillis() - start) + "ms");
+    for (int i = 0; i < BOOL_MSG_COUNT; i++) {
+      bundle.addMessage(new BooleanMessage("test123", i % 2 == 0));
     }
 
-  }
+    Random r = new Random();
+    for (int i = 0; i < DOUBLE_MSG_COUNT; i++) {
+      bundle.addMessage(new DoubleMessage("123123asd", r.nextDouble()));
+    }
 
-  private static final void testHadoop(BSPMessageBundle bundle)
-      throws IOException {
-    Configuration conf = new Configuration();
-    HadoopRPCInstance hadoopRPCInstance = new HadoopRPCInstance();
-    hadoopServer = new Server(hadoopRPCInstance, conf, new InetSocketAddress(
-        13612).getHostName(), 13612);
-    hadoopServer.start();
-    RPCTestInterface proxy = (RPCTestInterface) RPC.getProxy(
-        RPCTestInterface.class, 0, new InetSocketAddress(13612), conf);
-    start = System.currentTimeMillis();
-    proxy.transfer(bundle);
-    hadoopServer.stop();
+    return bundle;
   }
 
 }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/TestHadoopMessageManager.java Thu Feb 23 16:55:48 2012
@@ -33,8 +33,9 @@ public class TestHadoopMessageManager ex
 
   public void testMessaging() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS, "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
-    MessageManager<IntWritable> messageManager = new MessageManagerFactory<IntWritable>()
+    conf.set(MessageManagerFactory.MESSAGE_MANAGER_CLASS,
+        "org.apache.hama.bsp.message.HadoopMessageManagerImpl");
+    MessageManager<IntWritable> messageManager = MessageManagerFactory
         .getMessageManager(conf);
 
     assertTrue(messageManager instanceof HadoopMessageManagerImpl);
@@ -62,7 +63,7 @@ public class TestHadoopMessageManager ex
     }
 
     messageManager.transfer(peer, bundle);
-    
+
     messageManager.clearOutgoingQueues();
 
     assertTrue(messageManager.getNumCurrentMessages() == 1);

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/message/compress/TestBSPMessageCompressor.java Thu Feb 23 16:55:48 2012
@@ -27,8 +27,17 @@ import org.apache.hama.bsp.IntegerMessag
 public class TestBSPMessageCompressor extends TestCase {
 
   public void testCompression() {
+    Configuration configuration = new Configuration();
     BSPMessageCompressor<IntegerMessage> compressor = new BSPMessageCompressorFactory<IntegerMessage>()
-        .getCompressor(new Configuration());
+        .getCompressor(configuration);
+
+    assertNull(compressor);
+    configuration.setClass(BSPMessageCompressorFactory.COMPRESSION_CODEC_CLASS,
+        SnappyCompressor.class, BSPMessageCompressor.class);
+    compressor = new BSPMessageCompressorFactory<IntegerMessage>()
+        .getCompressor(configuration);
+    
+    assertNotNull(compressor);
 
     int n = 20;
     BSPMessageBundle<IntegerMessage> bundle = new BSPMessageBundle<IntegerMessage>();

Modified: incubator/hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/pom.xml?rev=1292852&r1=1292851&r2=1292852&view=diff
==============================================================================
--- incubator/hama/trunk/pom.xml (original)
+++ incubator/hama/trunk/pom.xml Thu Feb 23 16:55:48 2012
@@ -313,7 +313,7 @@
         <executions>
           <execution>
             <id>sign-artifacts</id>
-            <phase>verify</phase>
+            <phase>deploy</phase>
             <goals>
               <goal>sign</goal>
             </goals>