You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2013/02/14 23:26:18 UTC
svn commit: r1446372 - in /avro/trunk: ./
lang/java/ipc/src/test/java/org/apache/avro/ipc/
Author: cutting
Date: Thu Feb 14 22:26:18 2013
New Revision: 1446372
URL: http://svn.apache.org/r1446372
Log:
Java: Add TestNettyServerWithCompression, illustrating how one can add compression to Avro Netty-based RPC. Contributed by Ted Malaska.
Added:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Feb 14 22:26:18 2013
@@ -47,6 +47,10 @@ Trunk (not yet released)
AVRO-1255. Python: Make 'names' parameter optional in to_json methods.
(Jeremy Kahn via cutting)
+ AVRO-1251. Java: Add TestNettyServerWithCompression, illustrating
+ how one can add compression to Avro Netty-based RPC.
+ (Ted Malaska via cutting)
+
BUG FIXES
AVRO-1231. Java: Fix Trevni shredder to work on non-recursive
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServer.java Thu Feb 14 22:26:18 2013
@@ -80,17 +80,25 @@ public class TestNettyServer {
System.out.println("starting server...");
mailService = new MailImpl();
Responder responder = new SpecificResponder(Mail.class, mailService);
- server = new NettyServer(responder, new InetSocketAddress(0));
+ server = initializeServer(responder);
server.start();
int serverPort = server.getPort();
System.out.println("server port : " + serverPort);
- transceiver = new NettyTransceiver(new InetSocketAddress(
- serverPort), CONNECT_TIMEOUT_MILLIS);
+ transceiver = initializeTransceiver(serverPort);
proxy = SpecificRequestor.getClient(Mail.class, transceiver);
}
+ protected static Server initializeServer(Responder responder) {
+ return new NettyServer(responder, new InetSocketAddress(0));
+ }
+
+ protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+ return new NettyTransceiver(new InetSocketAddress(
+ serverPort), CONNECT_TIMEOUT_MILLIS);
+ }
+
@AfterClass
public static void tearDownConnections() throws Exception{
transceiver.close();
Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java?rev=1446372&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java Thu Feb 14 22:26:18 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro.ipc;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.codec.compression.ZlibDecoder;
+import org.jboss.netty.handler.codec.compression.ZlibEncoder;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestNettyServerWithCompression extends TestNettyServer{
+
+
+ protected static Server initializeServer(Responder responder) {
+ ChannelFactory channelFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()
+ );
+ return new NettyServer(responder, new InetSocketAddress(0),
+ channelFactory, new CompressionChannelPipelineFactory(),
+ null);
+ }
+
+ protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+ return new NettyTransceiver(new InetSocketAddress(serverPort),
+ new CompressionChannelFactory(),
+ CONNECT_TIMEOUT_MILLIS);
+ }
+
+
+ /**
+ * Factory of Compression-enabled client channels
+ */
+ private static class CompressionChannelFactory extends NioClientSocketChannelFactory {
+ public CompressionChannelFactory() {
+ super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+
+ @Override
+ public SocketChannel newChannel(ChannelPipeline pipeline) {
+ try {
+ ZlibEncoder encoder = new ZlibEncoder(6);
+ pipeline.addFirst("deflater", encoder);
+ pipeline.addFirst("inflater", new ZlibDecoder());
+ return super.newChannel(pipeline);
+ } catch (Exception ex) {
+ throw new RuntimeException("Cannot create Compression channel", ex);
+ }
+ }
+ }
+
+
+
+ /**
+ * Factory of Compression-enabled server worker channel pipelines
+ */
+ private static class CompressionChannelPipelineFactory
+ implements ChannelPipelineFactory {
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ ZlibEncoder encoder = new ZlibEncoder(6);
+ pipeline.addFirst("deflater", encoder);
+ pipeline.addFirst("inflater", new ZlibDecoder());
+ return pipeline;
+ }
+ }
+}
Propchange: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithCompression.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java?rev=1446372&r1=1446371&r2=1446372&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java (original)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java Thu Feb 14 22:26:18 2013
@@ -18,24 +18,18 @@
package org.apache.avro.ipc;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.KeyStore;
import java.security.Security;
import java.security.cert.X509Certificate;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
-import junit.framework.Assert;
-import org.apache.avro.ipc.specific.SpecificRequestor;
-import org.apache.avro.ipc.specific.SpecificResponder;
-import org.apache.avro.test.Mail;
-import org.apache.avro.test.Message;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@@ -44,121 +38,27 @@ import org.jboss.netty.channel.socket.So
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.ssl.SslHandler;
-import org.junit.AfterClass;
-import static org.junit.Assert.assertEquals;
-import org.junit.BeforeClass;
-import org.junit.Test;
-public class TestNettyServerWithSSL {
+public class TestNettyServerWithSSL extends TestNettyServer{
public static final String TEST_CERTIFICATE = "servercert.p12";
public static final String TEST_CERTIFICATE_PASSWORD = "s3cret";
- static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
- private static Server server;
- private static Transceiver transceiver;
- private static Mail proxy;
- private static MailImpl mailService;
-
- public static class MailImpl implements Mail {
-
- private CountDownLatch allMessages = new CountDownLatch(5);
-
- // in this simple example just return details of the message
- public String send(Message message) {
- return "Sent message to [" + message.getTo().toString() +
- "] from [" + message.getFrom().toString() + "] with body [" +
- message.getBody().toString() + "]";
- }
-
- public void fireandforget(Message message) {
- allMessages.countDown();
- }
-
- private void awaitMessages() throws InterruptedException {
- allMessages.await(2, TimeUnit.SECONDS);
- }
-
- private void assertAllMessagesReceived() {
- assertEquals(0, allMessages.getCount());
- }
-
- public void reset() {
- allMessages = new CountDownLatch(5);
- }
- }
-
- @BeforeClass
- public static void initializeConnections() throws Exception {
- // start server
- System.out.println("starting server...");
- mailService = new MailImpl();
- Responder responder = new SpecificResponder(Mail.class, mailService);
+
+ protected static Server initializeServer(Responder responder) {
ChannelFactory channelFactory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()
);
- server = new NettyServer(responder, new InetSocketAddress(0),
- channelFactory, new SSLChannelPipelineFactory(),
- null);
- server.start();
-
- int serverPort = server.getPort();
- System.out.println("server port : " + serverPort);
-
- transceiver = new NettyTransceiver(new InetSocketAddress(serverPort),
- new SSLChannelFactory(),
- CONNECT_TIMEOUT_MILLIS);
- proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ return new NettyServer(responder, new InetSocketAddress(0),
+ channelFactory, new SSLChannelPipelineFactory(),
+ null);
+ }
+
+ protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
+ return new NettyTransceiver(new InetSocketAddress(serverPort),
+ new SSLChannelFactory(),
+ CONNECT_TIMEOUT_MILLIS);
}
- @AfterClass
- public static void tearDownConnections() throws Exception {
- transceiver.close();
- server.close();
- }
-
- @Test
- public void testRequestResponse() throws Exception {
- for (int x = 0; x < 5; x++) {
- verifyResponse(proxy.send(createMessage()));
- }
- }
-
- private void verifyResponse(String result) {
- Assert.assertEquals(
- "Sent message to [wife] from [husband] with body [I love you!]",
- result.toString());
- }
-
- @Test
- public void testOneway() throws Exception {
- for (int x = 0; x < 5; x++) {
- proxy.fireandforget(createMessage());
- }
- mailService.awaitMessages();
- mailService.assertAllMessagesReceived();
- }
-
- @Test
- public void testMixtureOfRequests() throws Exception {
- mailService.reset();
- for (int x = 0; x < 5; x++) {
- Message createMessage = createMessage();
- proxy.fireandforget(createMessage);
- verifyResponse(proxy.send(createMessage));
- }
- mailService.awaitMessages();
- mailService.assertAllMessagesReceived();
-
- }
-
- private Message createMessage() {
- Message msg = Message.newBuilder().
- setTo("wife").
- setFrom("husband").
- setBody("I love you!").
- build();
- return msg;
- }
/**
* Factory of SSL-enabled client channels