You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/14 23:26:18 UTC

svn commit: r1446372 - in /avro/trunk: ./ lang/java/ipc/src/test/java/org/apache/avro/ipc/

Author: cutting
Date: Thu Feb 14 22:26:18 2013
New Revision: 1446372

URL: http://svn.apache.org/r1446372
Log:
Java: Add TestNettyServerWithCompression, illustrating how one can add compression to Avro Netty-based RPC.  Contributed by Ted Malaska.

Added:
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java   (with props)
Modified:
    avro/trunk/CHANGES.txt
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
    avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java

Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Feb 14 22:26:18 2013
@@ -47,6 +47,10 @@ Trunk (not yet released)
     AVRO-1255. Python: Make 'names' parameter optional in to_json methods.
     (Jeremy Kahn via cutting)
 
+    AVRO-1251. Java: Add TestNettyServerWithCompression, illustrating
+    how one can add compression to Avro Netty-based RPC.
+    (Ted Malaska via cutting)
+
   BUG FIXES
 
     AVRO-1231. Java: Fix Trevni shredder to work on non-recursive

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Thu Feb 14 22:26:18 2013
@@ -80,17 +80,25 @@ public class TestNettyServer {
     System.out.println("starting server...");
     mailService = new MailImpl();
     Responder responder = new SpecificResponder(Mail.class, mailService);
-    server = new NettyServer(responder, new InetSocketAddress(0));
+    server = initializeServer(responder);
     server.start();
   
     int serverPort = server.getPort();
     System.out.println("server port : " + serverPort);
 
-    transceiver = new NettyTransceiver(new InetSocketAddress(
-        serverPort), CONNECT_TIMEOUT_MILLIS);
+    transceiver = initializeTransceiver(serverPort);
     proxy = SpecificRequestor.getClient(Mail.class, transceiver);
   }
   
+  protected static Server initializeServer(Responder responder) {
+    return new NettyServer(responder, new InetSocketAddress(0));
+  }
+  
+  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+    return new NettyTransceiver(new InetSocketAddress(
+        serverPort), CONNECT_TIMEOUT_MILLIS);
+  }
+  
   @AfterClass
   public static void tearDownConnections() throws Exception{
     transceiver.close();

Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java?rev=1446372&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java Thu Feb 14 22:26:18 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestNettyServerWithCompression extends TestNettyServer{
+
+
+  protected static Server initializeServer(Responder responder) {
+    ChannelFactory channelFactory = new NioServerSocketChannelFactory(
+        Executors.newCachedThreadPool(),
+        Executors.newCachedThreadPool()
+    );
+    return  new NettyServer(responder, new InetSocketAddress(0),
+        channelFactory, new CompressionChannelPipelineFactory(),
+        null);
+  }
+  
+  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+    return  new NettyTransceiver(new InetSocketAddress(serverPort),
+        new CompressionChannelFactory(),
+        CONNECT_TIMEOUT_MILLIS);
+  }
+
+
+  /**
+   * Factory of Compression-enabled client channels
+   */
+  private static class CompressionChannelFactory extends NioClientSocketChannelFactory {
+    public CompressionChannelFactory() {
+      super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+    }
+
+    @Override
+    public SocketChannel newChannel(ChannelPipeline pipeline) {
+      try {
+        ZlibEncoder encoder = new ZlibEncoder(6);
+        pipeline.addFirst("deflater", encoder);
+        pipeline.addFirst("inflater", new ZlibDecoder());
+        return super.newChannel(pipeline);
+      } catch (Exception ex) {
+        throw new RuntimeException("Cannot create Compression channel", ex);
+      }
+    }
+  }
+
+
+
+  /**
+   * Factory of Compression-enabled server worker channel pipelines
+   */
+  private static class CompressionChannelPipelineFactory
+      implements ChannelPipelineFactory {
+
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+      ChannelPipeline pipeline = Channels.pipeline();
+      ZlibEncoder encoder = new ZlibEncoder(6);
+      pipeline.addFirst("deflater", encoder);
+      pipeline.addFirst("inflater", new ZlibDecoder());
+      return pipeline;
+    }
+  }
+}

Propchange: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java Thu Feb 14 22:26:18 2013
@@ -18,24 +18,18 @@
 
 package org.apache.avro.ipc;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.KeyStore;
 import java.security.Security;
 import java.security.cert.X509Certificate;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.TrustManager;
 import javax.net.ssl.X509TrustManager;
 
-import junit.framework.Assert;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.avro.ipc.specific.SpecificResponder;
-import org.apache.avro.test.Mail;
-import org.apache.avro.test.Message;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -44,121 +38,27 @@ import org.jboss.netty.channel.socket.So
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.ssl.SslHandler;
-import org.junit.AfterClass;
-import static org.junit.Assert.assertEquals;
-import org.junit.BeforeClass;
-import org.junit.Test;
 
-public class TestNettyServerWithSSL {
+public class TestNettyServerWithSSL extends TestNettyServer{
   public static final String TEST_CERTIFICATE = "servercert.p12";
   public static final String TEST_CERTIFICATE_PASSWORD = "s3cret";
-  static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
-  private static Server server;
-  private static Transceiver transceiver;
-  private static Mail proxy;
-  private static MailImpl mailService;
-
-  public static class MailImpl implements Mail {
-
-    private CountDownLatch allMessages = new CountDownLatch(5);
-
-    // in this simple example just return details of the message
-    public String send(Message message) {
-      return "Sent message to [" + message.getTo().toString() +
-             "] from [" + message.getFrom().toString() + "] with body [" +
-             message.getBody().toString() + "]";
-    }
-
-    public void fireandforget(Message message) {
-      allMessages.countDown();
-    }
-
-    private void awaitMessages() throws InterruptedException {
-      allMessages.await(2, TimeUnit.SECONDS);
-    }
-
-    private void assertAllMessagesReceived() {
-      assertEquals(0, allMessages.getCount());
-    }
-
-    public void reset() {
-      allMessages = new CountDownLatch(5);
-    }
-  }
-
-  @BeforeClass
-  public static void initializeConnections() throws Exception {
-    // start server
-    System.out.println("starting server...");
-    mailService = new MailImpl();
-    Responder responder = new SpecificResponder(Mail.class, mailService);
+  
+  protected static Server initializeServer(Responder responder) {
     ChannelFactory channelFactory = new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(),
         Executors.newCachedThreadPool()
     );
-    server = new NettyServer(responder, new InetSocketAddress(0),
-                             channelFactory, new SSLChannelPipelineFactory(),
-                             null);
-    server.start();
-
-    int serverPort = server.getPort();
-    System.out.println("server port : " + serverPort);
-
-    transceiver = new NettyTransceiver(new InetSocketAddress(serverPort),
-                                       new SSLChannelFactory(),
-                                       CONNECT_TIMEOUT_MILLIS);
-    proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+    return new NettyServer(responder, new InetSocketAddress(0),
+        channelFactory, new SSLChannelPipelineFactory(),
+        null);
+  }
+  
+  protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+    return  new NettyTransceiver(new InetSocketAddress(serverPort),
+        new SSLChannelFactory(),
+        CONNECT_TIMEOUT_MILLIS);
   }
 
-  @AfterClass
-  public static void tearDownConnections() throws Exception {
-    transceiver.close();
-    server.close();
-  }
-
-  @Test
-  public void testRequestResponse() throws Exception {
-    for (int x = 0; x < 5; x++) {
-      verifyResponse(proxy.send(createMessage()));
-    }
-  }
-
-  private void verifyResponse(String result) {
-    Assert.assertEquals(
-        "Sent message to [wife] from [husband] with body [I love you!]",
-        result.toString());
-  }
-
-  @Test
-  public void testOneway() throws Exception {
-    for (int x = 0; x < 5; x++) {
-      proxy.fireandforget(createMessage());
-    }
-    mailService.awaitMessages();
-    mailService.assertAllMessagesReceived();
-  }
-
-  @Test
-  public void testMixtureOfRequests() throws Exception {
-    mailService.reset();
-    for (int x = 0; x < 5; x++) {
-      Message createMessage = createMessage();
-      proxy.fireandforget(createMessage);
-      verifyResponse(proxy.send(createMessage));
-    }
-    mailService.awaitMessages();
-    mailService.assertAllMessagesReceived();
-
-  }
-
-  private Message createMessage() {
-    Message msg = Message.newBuilder().
-        setTo("wife").
-        setFrom("husband").
-        setBody("I love you!").
-        build();
-    return msg;
-  }
 
   /**
    * Factory of SSL-enabled client channels