You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@avro.apache.org by cu...@apache.org on 2012/07/05 23:41:47 UTC
svn commit: r1357944 - in /avro/trunk: ./
lang/java/ipc/src/main/java/org/apache/avro/ipc/
lang/java/ipc/src/test/java/org/apache/avro/ipc/
lang/java/ipc/src/test/resources/ lang/java/ipc/src/test/resources/org/
lang/java/ipc/src/test/resources/org/apa...
Author: cutting
Date: Thu Jul 5 21:41:47 2012
New Revision: 1357944
URL: http://svn.apache.org/viewvc?rev=1357944&view=rev
Log:
AVRO-1119. Java: Permit NettyServer to be used with SSL. Contributed by Sebastian Ortega.
Added:
avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java (with props)
avro/trunk/lang/java/ipc/src/test/resources/
avro/trunk/lang/java/ipc/src/test/resources/org/
avro/trunk/lang/java/ipc/src/test/resources/org/apache/
avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/
avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/ipc/
avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/ipc/servercert.p12 (with props)
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
Modified: avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1357944&r1=1357943&r2=1357944&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Thu Jul 5 21:41:47 2012
@@ -24,6 +24,9 @@ Avro 1.7.1 (unreleased)
AVRO-1120. Let AvroMultipleOutput jobs use multiple schemas with
map-only jobs. (Ashish Nagavaram via cutting)
+ AVRO-1119. Java: Permit NettyServer to be used with SSL.
+ (Sebastian Ortega via cutting)
+
BUG FIXES
AVRO-1114. Java: Update license headers for new mapreduce code. (cutting)
Modified: avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java?rev=1357944&r1=1357943&r2=1357944&view=diff
==============================================================================
--- avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java (original)
+++ avro/trunk/lang/java/ipc/src/main/java/org/apache/avro/ipc/NettyServer.java Thu Jul 5 21:41:47 2012
@@ -74,34 +74,54 @@ public class NettyServer implements Serv
this(responder, addr, channelFactory, null);
}
- /**
- *
- * @param executionHandler if not null, will be inserted into the Netty
- * pipeline. Use this when your responder does
- * long, non-cpu bound processing (see Netty's
- * ExecutionHandler javadoc).
- */
+ /**
+ * @param executionHandler if not null, will be inserted into the Netty
+ * pipeline. Use this when your responder does
+ * long, non-cpu bound processing (see Netty's
+ * ExecutionHandler javadoc).
+ * @param pipelineFactory Avro-related handlers will be added on top of
+ * what this factory creates
+ */
+ public NettyServer(Responder responder, InetSocketAddress addr,
+ ChannelFactory channelFactory,
+ final ChannelPipelineFactory pipelineFactory,
+ final ExecutionHandler executionHandler) {
+ this.responder = responder;
+ this.channelFactory = channelFactory;
+ this.executionHandler = executionHandler;
+ ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
+ bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline p = pipelineFactory.getPipeline();
+ p.addLast("frameDecoder", new NettyFrameDecoder());
+ p.addLast("frameEncoder", new NettyFrameEncoder());
+ if (executionHandler != null) {
+ p.addLast("executionHandler", executionHandler);
+ }
+ p.addLast("handler", new NettyServerAvroHandler());
+ return p;
+ }
+ });
+ serverChannel = bootstrap.bind(addr);
+ allChannels.add(serverChannel);
+ }
+
+ /**
+ * @param executionHandler if not null, will be inserted into the Netty
+ * pipeline. Use this when your responder does
+ * long, non-cpu bound processing (see Netty's
+ * ExecutionHandler javadoc).
+ */
public NettyServer(Responder responder, InetSocketAddress addr,
- ChannelFactory channelFactory, final ExecutionHandler executionHandler) {
- this.responder = responder;
- this.channelFactory = channelFactory;
- this.executionHandler = executionHandler;
- ServerBootstrap bootstrap = new ServerBootstrap(channelFactory);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline p = Channels.pipeline();
- p.addLast("frameDecoder", new NettyFrameDecoder());
- p.addLast("frameEncoder", new NettyFrameEncoder());
- if (executionHandler != null) {
- p.addLast("executionHandler", executionHandler);
- }
- p.addLast("handler", new NettyServerAvroHandler());
- return p;
- }
- });
- serverChannel = bootstrap.bind(addr);
- allChannels.add(serverChannel);
+ ChannelFactory channelFactory,
+ final ExecutionHandler executionHandler) {
+ this(responder, addr, channelFactory, new ChannelPipelineFactory() {
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ return Channels.pipeline();
+ }
+ }, executionHandler);
}
@Override
Added: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java?rev=1357944&view=auto
==============================================================================
--- avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java (added)
+++ avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java Thu Jul 5 21:41:47 2012
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.net.InetSocketAddress;
+import java.security.KeyStore;
+import java.security.Security;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import junit.framework.Assert;
+import org.apache.avro.ipc.specific.SpecificRequestor;
+import org.apache.avro.ipc.specific.SpecificResponder;
+import org.apache.avro.test.Mail;
+import org.apache.avro.test.Message;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.handler.ssl.SslHandler;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertEquals;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestNettyServerWithSSL {
+ public static final String TEST_CERTIFICATE = "servercert.p12";
+ public static final String TEST_CERTIFICATE_PASSWORD = "s3cret";
+ static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
+ private static Server server;
+ private static Transceiver transceiver;
+ private static Mail proxy;
+ private static MailImpl mailService;
+
+ public static class MailImpl implements Mail {
+
+ private CountDownLatch allMessages = new CountDownLatch(5);
+
+ // in this simple example just return details of the message
+ public String send(Message message) {
+ return "Sent message to [" + message.getTo().toString() +
+ "] from [" + message.getFrom().toString() + "] with body [" +
+ message.getBody().toString() + "]";
+ }
+
+ public void fireandforget(Message message) {
+ allMessages.countDown();
+ }
+
+ private void awaitMessages() throws InterruptedException {
+ allMessages.await(2, TimeUnit.SECONDS);
+ }
+
+ private void assertAllMessagesReceived() {
+ assertEquals(0, allMessages.getCount());
+ }
+
+ public void reset() {
+ allMessages = new CountDownLatch(5);
+ }
+ }
+
+ @BeforeClass
+ public static void initializeConnections() throws Exception {
+ // start server
+ System.out.println("starting server...");
+ mailService = new MailImpl();
+ Responder responder = new SpecificResponder(Mail.class, mailService);
+ ChannelFactory channelFactory = new NioServerSocketChannelFactory(
+ Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()
+ );
+ server = new NettyServer(responder, new InetSocketAddress(0),
+ channelFactory, new SSLChannelPipelineFactory(),
+ null);
+ server.start();
+
+ int serverPort = server.getPort();
+ System.out.println("server port : " + serverPort);
+
+ transceiver = new NettyTransceiver(new InetSocketAddress(serverPort),
+ new SSLChannelFactory(),
+ CONNECT_TIMEOUT_MILLIS);
+ proxy = SpecificRequestor.getClient(Mail.class, transceiver);
+ }
+
+ @AfterClass
+ public static void tearDownConnections() throws Exception {
+ transceiver.close();
+ server.close();
+ }
+
+ @Test
+ public void testRequestResponse() throws Exception {
+ for (int x = 0; x < 5; x++) {
+ verifyResponse(proxy.send(createMessage()));
+ }
+ }
+
+ private void verifyResponse(String result) {
+ Assert.assertEquals(
+ "Sent message to [wife] from [husband] with body [I love you!]",
+ result.toString());
+ }
+
+ @Test
+ public void testOneway() throws Exception {
+ for (int x = 0; x < 5; x++) {
+ proxy.fireandforget(createMessage());
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+ }
+
+ @Test
+ public void testMixtureOfRequests() throws Exception {
+ mailService.reset();
+ for (int x = 0; x < 5; x++) {
+ Message createMessage = createMessage();
+ proxy.fireandforget(createMessage);
+ verifyResponse(proxy.send(createMessage));
+ }
+ mailService.awaitMessages();
+ mailService.assertAllMessagesReceived();
+
+ }
+
+ private Message createMessage() {
+ Message msg = Message.newBuilder().
+ setTo("wife").
+ setFrom("husband").
+ setBody("I love you!").
+ build();
+ return msg;
+ }
+
+ /**
+ * Factory of SSL-enabled client channels
+ */
+ private static class SSLChannelFactory extends NioClientSocketChannelFactory {
+ public SSLChannelFactory() {
+ super(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ }
+
+ @Override
+ public SocketChannel newChannel(ChannelPipeline pipeline) {
+ try {
+ SSLContext sslContext = SSLContext.getInstance("TLS");
+ sslContext.init(null, new TrustManager[]{new BogusTrustManager()},
+ null);
+ SSLEngine sslEngine = sslContext.createSSLEngine();
+ sslEngine.setUseClientMode(true);
+ pipeline.addFirst("ssl", new SslHandler(sslEngine));
+ return super.newChannel(pipeline);
+ } catch (Exception ex) {
+ throw new RuntimeException("Cannot create SSL channel", ex);
+ }
+ }
+ }
+
+ /**
+ * Bogus trust manager accepting any certificate
+ */
+ private static class BogusTrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(X509Certificate[] certs, String s) {
+ // nothing
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] certs, String s) {
+ // nothing
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ }
+
+ /**
+ * Factory of SSL-enabled server worker channel pipelines
+ */
+ private static class SSLChannelPipelineFactory
+ implements ChannelPipelineFactory {
+
+ private SSLContext createServerSSLContext() {
+ try {
+ KeyStore ks = KeyStore.getInstance("PKCS12");
+ ks.load(
+ TestNettyServer.class.getResource(TEST_CERTIFICATE).openStream(),
+ TEST_CERTIFICATE_PASSWORD.toCharArray());
+
+ // Set up key manager factory to use our key store
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance(getAlgorithm());
+ kmf.init(ks, TEST_CERTIFICATE_PASSWORD.toCharArray());
+
+ SSLContext serverContext = SSLContext.getInstance("TLS");
+ serverContext.init(kmf.getKeyManagers(), null, null);
+ return serverContext;
+ } catch (Exception e) {
+ throw new Error("Failed to initialize the server-side SSLContext", e);
+ }
+ }
+
+ private String getAlgorithm() {
+ String algorithm = Security.getProperty(
+ "ssl.KeyManagerFactory.algorithm");
+ if (algorithm == null) {
+ algorithm = "SunX509";
+ }
+ return algorithm;
+ }
+
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ SSLEngine sslEngine = createServerSSLContext().createSSLEngine();
+ sslEngine.setUseClientMode(false);
+ pipeline.addLast("ssl", new SslHandler(sslEngine));
+ return pipeline;
+ }
+ }
+}
Propchange: avro/trunk/lang/java/ipc/src/test/java/org/apache/avro/ipc/TestNettyServerWithSSL.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/ipc/servercert.p12
URL: http://svn.apache.org/viewvc/avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/ipc/servercert.p12?rev=1357944&view=auto
==============================================================================
Binary file - no diff available.
Propchange: avro/trunk/lang/java/ipc/src/test/resources/org/apache/avro/ipc/servercert.p12
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream