You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/22 23:52:28 UTC

[1/2] qpid-jms git commit: move these to a matching test package

Repository: qpid-jms
Updated Branches:
  refs/heads/master 3d31992d5 -> 9d2ed0d66


move these to a matching test package

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10981041
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10981041
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10981041

Branch: refs/heads/master
Commit: 10981041d41b2d7455813f90008d5f8efb98205b
Parents: 3d31992
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 22 15:12:00 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 22 15:12:00 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/test/netty/NettyEchoServer.java    | 155 --------
 .../jms/test/netty/NettyTcpTransportTest.java   | 353 -------------------
 .../jms/transports/netty/NettyEchoServer.java   | 155 ++++++++
 .../transports/netty/NettyTcpTransportTest.java | 353 +++++++++++++++++++
 4 files changed, 508 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
deleted file mode 100644
index 42bc535..0000000
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.Future;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ServerSocketFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple Netty Server used to echo all data.
- */
-public class NettyEchoServer implements AutoCloseable {
-    private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class);
-
-    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
-    static final int TIMEOUT = 5000;
-
-    private EventLoopGroup bossGroup;
-    private EventLoopGroup workerGroup;
-    private Channel serverChannel;
-    private int serverPort;
-
-    private final AtomicBoolean started = new AtomicBoolean();
-
-    public void start() throws Exception {
-
-        if (started.compareAndSet(false, true)) {
-
-            // Configure the server.
-            bossGroup = new NioEventLoopGroup(1);
-            workerGroup = new NioEventLoopGroup();
-
-            ServerBootstrap server = new ServerBootstrap();
-            server.group(bossGroup, workerGroup);
-            server.channel(NioServerSocketChannel.class);
-            server.option(ChannelOption.SO_BACKLOG, 100);
-            server.handler(new LoggingHandler(LogLevel.INFO));
-            server.childHandler(new ChannelInitializer<Channel>() {
-                @Override
-                public void initChannel(Channel ch) throws Exception {
-                    ch.pipeline().addLast(new EchoServerHandler());
-                }
-            });
-
-            // Start the server.
-            serverChannel = server.bind(getServerPort()).sync().channel();
-        }
-    }
-
-    public void stop() throws InterruptedException {
-        if (started.compareAndSet(true, false)) {
-            try {
-                LOG.info("Syncing channel close");
-                serverChannel.close().sync();
-            } catch (InterruptedException e) {
-            }
-            // Shut down all event loops to terminate all threads.
-            LOG.info("Shutting down boss group");
-            Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
-            LOG.info("Shutting down worker group");
-            Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
-
-            LOG.info("Awaiting boss group shutdown");
-            boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
-
-            LOG.info("Awaiting worker group shutdown");
-            boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
-
-            if (!bossShutdown) {
-                throw new RuntimeException("Failed to shut down bossGroup in allotted time");
-            }
-            if (!workerShutdown) {
-                throw new RuntimeException("Failed to shut down workerGroup in allotted time");
-            }
-        }
-    }
-
-    @Override
-    public void close() throws InterruptedException {
-        stop();
-    }
-
-    public int getServerPort() {
-        if (serverPort == 0) {
-            ServerSocket ss = null;
-            try {
-                ss = ServerSocketFactory.getDefault().createServerSocket(0);
-                serverPort = ss.getLocalPort();
-            } catch (IOException e) { // revert back to default
-                serverPort = PORT;
-            } finally {
-                try {
-                    if (ss != null ) {
-                        ss.close();
-                    }
-                } catch (IOException e) { // ignore
-                }
-            }
-        }
-        return serverPort;
-    }
-
-    private class EchoServerHandler extends ChannelInboundHandlerAdapter {
-
-        @Override
-        public void channelRead(ChannelHandlerContext ctx, Object msg) {
-            ctx.write(msg);
-        }
-
-        @Override
-        public void channelReadComplete(ChannelHandlerContext ctx) {
-            ctx.flush();
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-            // Close the connection when an exception is raised.
-            cause.printStackTrace();
-            ctx.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
deleted file mode 100644
index 448b1be..0000000
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpid.jms.test.QpidJmsTestCase;
-import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.TransportListener;
-import org.apache.qpid.jms.transports.TransportOptions;
-import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test basic functionality of the Netty based TCP transport.
- */
-public class NettyTcpTransportTest extends QpidJmsTestCase {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
-
-    private static final int SEND_BYTE_COUNT = 1024;
-
-    private boolean transportClosed;
-    private final List<Throwable> exceptions = new ArrayList<Throwable>();
-    private final List<ByteBuf> data = new ArrayList<ByteBuf>();
-    private final AtomicInteger bytesRead = new AtomicInteger();
-
-    private final TransportListener testListener = new NettyTransportListener();
-    private final TransportOptions testOptions = new TransportOptions();
-
-    @Test(timeout = 60 * 1000)
-    public void testConnectToServer() throws Exception {
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleConnectionsToServer() throws Exception {
-        final int CONNECTION_COUNT = 25;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
-
-            for (int i = 0; i < CONNECTION_COUNT; ++i) {
-                NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-                try {
-                    transport.connect();
-                    assertTrue(transport.isConnected());
-                    LOG.info("Connected to test server.");
-                    transports.add(transport);
-                } catch (Exception e) {
-                    fail("Should have connected to the server");
-                }
-            }
-
-            for (NettyTcpTransport transport : transports) {
-                transport.close();
-            }
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleConnectionsSendReceive() throws Exception {
-        final int CONNECTION_COUNT = 25;
-        final int FRAME_SIZE = 8;
-
-        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
-        for (int i = 0; i < 8; ++i) {
-            sendBuffer.writeByte('A');
-        }
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
-
-            for (int i = 0; i < CONNECTION_COUNT; ++i) {
-                NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-                try {
-                    transport.connect();
-                    transport.send(sendBuffer.copy());
-                    transports.add(transport);
-                } catch (Exception e) {
-                    fail("Should have connected to the server");
-                }
-            }
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
-                    return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
-                }
-            }));
-
-            for (NettyTcpTransport transport : transports) {
-                transport.close();
-            }
-        }
-
-        assertTrue(exceptions.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testDetectServerClose() throws Exception {
-        NettyTcpTransport transport = null;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            server.close();
-        }
-
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return transportClosed;
-            }
-        }));
-        assertTrue(exceptions.isEmpty());
-        assertTrue(data.isEmpty());
-        assertFalse(transport.isConnected());
-
-        try {
-            transport.close();
-        } catch (Exception ex) {
-            fail("Close of a disconnect transport should not generate errors");
-        }
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testDataSentIsReceived() throws Exception {
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
-            for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
-                sendBuffer.writeByte('A');
-            }
-
-            transport.send(sendBuffer);
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return !data.isEmpty();
-                }
-            }));
-
-            assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
-        assertTrue(exceptions.isEmpty());
-    }
-
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleDataPacketsSentAreReceived() throws Exception {
-        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception {
-        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
-    }
-
-    public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception {
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            ByteBuf sendBuffer = Unpooled.buffer(byteCount);
-            for (int i = 0; i < byteCount; ++i) {
-                sendBuffer.writeByte('A');
-            }
-
-            for (int i = 0; i < iterations; ++i) {
-                transport.send(sendBuffer.copy());
-            }
-
-            assertTrue(Wait.waitFor(new Wait.Condition() {
-
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return bytesRead.get() == (byteCount * iterations);
-                }
-            }));
-
-            transport.close();
-        }
-
-        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
-        assertTrue(exceptions.isEmpty());
-    }
-
-    @Test(timeout = 60 * 1000)
-    public void testSendToClosedTransportFails() throws Exception {
-        NettyTcpTransport transport = null;
-
-        try (NettyEchoServer server = new NettyEchoServer()) {
-            server.start();
-
-            int port = server.getServerPort();
-            URI serverLocation = new URI("tcp://localhost:" + port);
-
-            transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
-            try {
-                transport.connect();
-                LOG.info("Connected to test server.");
-            } catch (Exception e) {
-                fail("Should have connected to the server");
-            }
-
-            assertTrue(transport.isConnected());
-
-            transport.close();
-
-            ByteBuf sendBuffer = Unpooled.buffer(10);
-            try {
-                transport.send(sendBuffer);
-                fail("Should throw on send of closed transport");
-            } catch (IOException ex) {
-            }
-        }
-    }
-
-    private class NettyTransportListener implements TransportListener {
-
-        @Override
-        public void onData(ByteBuf incoming) {
-            LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes());
-            data.add(incoming);
-            bytesRead.addAndGet(incoming.readableBytes());
-        }
-
-        @Override
-        public void onTransportClosed() {
-            LOG.debug("Transport reports that it has closed.");
-            transportClosed = true;
-        }
-
-        @Override
-        public void onTransportError(Throwable cause) {
-            LOG.debug("Transport error caught: {}", cause.getMessage());
-            exceptions.add(cause);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
new file mode 100644
index 0000000..592ed1d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple Netty Server used to echo all data.
+ */
+public class NettyEchoServer implements AutoCloseable {
+    private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class);
+
+    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
+    static final int TIMEOUT = 5000;
+
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel serverChannel;
+    private int serverPort;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    public void start() throws Exception {
+
+        if (started.compareAndSet(false, true)) {
+
+            // Configure the server.
+            bossGroup = new NioEventLoopGroup(1);
+            workerGroup = new NioEventLoopGroup();
+
+            ServerBootstrap server = new ServerBootstrap();
+            server.group(bossGroup, workerGroup);
+            server.channel(NioServerSocketChannel.class);
+            server.option(ChannelOption.SO_BACKLOG, 100);
+            server.handler(new LoggingHandler(LogLevel.INFO));
+            server.childHandler(new ChannelInitializer<Channel>() {
+                @Override
+                public void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(new EchoServerHandler());
+                }
+            });
+
+            // Start the server.
+            serverChannel = server.bind(getServerPort()).sync().channel();
+        }
+    }
+
+    public void stop() throws InterruptedException {
+        if (started.compareAndSet(true, false)) {
+            try {
+                LOG.info("Syncing channel close");
+                serverChannel.close().sync();
+            } catch (InterruptedException e) {
+            }
+            // Shut down all event loops to terminate all threads.
+            LOG.info("Shutting down boss group");
+            Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
+            LOG.info("Shutting down worker group");
+            Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
+
+            LOG.info("Awaiting boss group shutdown");
+            boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
+
+            LOG.info("Awaiting worker group shutdown");
+            boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
+
+            if (!bossShutdown) {
+                throw new RuntimeException("Failed to shut down bossGroup in allotted time");
+            }
+            if (!workerShutdown) {
+                throw new RuntimeException("Failed to shut down workerGroup in allotted time");
+            }
+        }
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        stop();
+    }
+
+    public int getServerPort() {
+        if (serverPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                serverPort = ss.getLocalPort();
+            } catch (IOException e) { // revert back to default
+                serverPort = PORT;
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+        return serverPort;
+    }
+
+    private class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            ctx.write(msg);
+        }
+
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) {
+            ctx.flush();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+            // Close the connection when an exception is raised.
+            cause.printStackTrace();
+            ctx.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
new file mode 100644
index 0000000..e65a1b7
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic functionality of the Netty based TCP transport.
+ */
+public class NettyTcpTransportTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
+
+    private static final int SEND_BYTE_COUNT = 1024;
+
+    private boolean transportClosed;
+    private final List<Throwable> exceptions = new ArrayList<Throwable>();
+    private final List<ByteBuf> data = new ArrayList<ByteBuf>();
+    private final AtomicInteger bytesRead = new AtomicInteger();
+
+    private final TransportListener testListener = new NettyTransportListener();
+    private final TransportOptions testOptions = new TransportOptions();
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServer() throws Exception {
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleConnectionsToServer() throws Exception {
+        final int CONNECTION_COUNT = 25;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
+
+            for (int i = 0; i < CONNECTION_COUNT; ++i) {
+                NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+                try {
+                    transport.connect();
+                    assertTrue(transport.isConnected());
+                    LOG.info("Connected to test server.");
+                    transports.add(transport);
+                } catch (Exception e) {
+                    fail("Should have connected to the server");
+                }
+            }
+
+            for (NettyTcpTransport transport : transports) {
+                transport.close();
+            }
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleConnectionsSendReceive() throws Exception {
+        final int CONNECTION_COUNT = 25;
+        final int FRAME_SIZE = 8;
+
+        ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+        for (int i = 0; i < 8; ++i) {
+            sendBuffer.writeByte('A');
+        }
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
+
+            for (int i = 0; i < CONNECTION_COUNT; ++i) {
+                NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+                try {
+                    transport.connect();
+                    transport.send(sendBuffer.copy());
+                    transports.add(transport);
+                } catch (Exception e) {
+                    fail("Should have connected to the server");
+                }
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
+                    return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
+                }
+            }));
+
+            for (NettyTcpTransport transport : transports) {
+                transport.close();
+            }
+        }
+
+        assertTrue(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDetectServerClose() throws Exception {
+        NettyTcpTransport transport = null;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            server.close();
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return transportClosed;
+            }
+        }));
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+        assertFalse(transport.isConnected());
+
+        try {
+            transport.close();
+        } catch (Exception ex) {
+            fail("Close of a disconnect transport should not generate errors");
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDataSentIsReceived() throws Exception {
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
+            for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
+                sendBuffer.writeByte('A');
+            }
+
+            transport.send(sendBuffer);
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !data.isEmpty();
+                }
+            }));
+
+            assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+    }
+
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleDataPacketsSentAreReceived() throws Exception {
+        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception {
+        doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
+    }
+
+    public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception {
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            ByteBuf sendBuffer = Unpooled.buffer(byteCount);
+            for (int i = 0; i < byteCount; ++i) {
+                sendBuffer.writeByte('A');
+            }
+
+            for (int i = 0; i < iterations; ++i) {
+                transport.send(sendBuffer.copy());
+            }
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return bytesRead.get() == (byteCount * iterations);
+                }
+            }));
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testSendToClosedTransportFails() throws Exception {
+        NettyTcpTransport transport = null;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            transport.close();
+
+            ByteBuf sendBuffer = Unpooled.buffer(10);
+            try {
+                transport.send(sendBuffer);
+                fail("Should throw on send of closed transport");
+            } catch (IOException ex) {
+            }
+        }
+    }
+
+    private class NettyTransportListener implements TransportListener {
+
+        @Override
+        public void onData(ByteBuf incoming) {
+            LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes());
+            data.add(incoming);
+            bytesRead.addAndGet(incoming.readableBytes());
+        }
+
+        @Override
+        public void onTransportClosed() {
+            LOG.debug("Transport reports that it has closed.");
+            transportClosed = true;
+        }
+
+        @Override
+        public void onTransportError(Throwable cause) {
+            LOG.debug("Transport error caught: {}", cause.getMessage());
+            exceptions.add(cause);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: Add some more framework to support ssl transports over Netty.

Posted by ta...@apache.org.
Add some more framework to support ssl transports over Netty.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9d2ed0d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9d2ed0d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9d2ed0d6

Branch: refs/heads/master
Commit: 9d2ed0d668b38245f02282279d0e1c223bf65610
Parents: 1098104
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 22 17:38:47 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 22 17:38:47 2015 -0500

----------------------------------------------------------------------
 .../jms/transports/TransportSslOptions.java     |  40 ++++-
 .../qpid/jms/transports/TransportSupport.java   | 160 +++++++++++++++++++
 .../jms/transports/netty/NettySslTransport.java |  14 ++
 .../jms/transports/netty/NettyTcpTransport.java |  14 +-
 .../jms/transports/TransportOptionsTest.java    |  87 ++++++++++
 .../jms/transports/TransportSslOptionsTest.java | 121 ++++++++++++++
 .../jms/transports/TransportSupportTest.java    |  81 ++++++++++
 .../src/test/resources/broker-jks.keystore      | Bin 0 -> 2102 bytes
 .../src/test/resources/broker-jks.truststore    | Bin 0 -> 1589 bytes
 .../src/test/resources/client-jks.keystore      | Bin 0 -> 2097 bytes
 .../src/test/resources/client-jks.truststore    | Bin 0 -> 1592 bytes
 11 files changed, 508 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
index f169c78..17bf78d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
@@ -16,6 +16,7 @@
  */
 package org.apache.qpid.jms.transports;
 
+
 /**
  * Holds the defined SSL options for connections that operate over a secure
  * transport.  Options are read from the environment and can be overridden by
@@ -23,8 +24,10 @@ package org.apache.qpid.jms.transports;
  */
 public class TransportSslOptions extends TransportOptions {
 
-    private static final String[] DEFAULT_ENABLED_PROTOCOLS = {"SSLv2Hello", "TLSv1", "TLSv1.1", "TLSv1.2"};
-    private static final String DEFAULT_STORE_TYPE = "JKS";
+    public static final String[] DEFAULT_ENABLED_PROTOCOLS = {"SSLv2Hello", "TLSv1", "TLSv1.1", "TLSv1.2"};
+    public static final String DEFAULT_STORE_TYPE = "jks";
+    public static final boolean DEFAULT_TRUST_ALL = false;
+    public static final boolean DEFAULT_VERIFY_HOST = false;
 
     public static final TransportSslOptions INSTANCE = new TransportSslOptions();
 
@@ -36,6 +39,9 @@ public class TransportSslOptions extends TransportOptions {
     private String[] enabledCipherSuites;
     private String[] enabledProtocols = DEFAULT_ENABLED_PROTOCOLS;
 
+    private boolean trustAll = DEFAULT_TRUST_ALL;
+    private boolean verifyHost = DEFAULT_VERIFY_HOST;
+
     static {
         INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
         INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
@@ -145,6 +151,34 @@ public class TransportSslOptions extends TransportOptions {
         this.enabledProtocols = enabledProtocols;
     }
 
+    /**
+     * @return the trustAll
+     */
+    public boolean isTrustAll() {
+        return trustAll;
+    }
+
+    /**
+     * @param trustAll the trustAll to set
+     */
+    public void setTrustAll(boolean trustAll) {
+        this.trustAll = trustAll;
+    }
+
+    /**
+     * @return the verifyHost
+     */
+    public boolean isVerifyHost() {
+        return verifyHost;
+    }
+
+    /**
+     * @param verifyHost the verifyHost to set
+     */
+    public void setVerifyHost(boolean verifyHost) {
+        this.verifyHost = verifyHost;
+    }
+
     @Override
     public TransportSslOptions clone() {
         return copyOptions(new TransportSslOptions());
@@ -160,6 +194,8 @@ public class TransportSslOptions extends TransportOptions {
         copy.setStoreType(getStoreType());
         copy.setEnabledCipherSuites(getEnabledCipherSuites());
         copy.setEnabledProtocols(getEnabledProtocols());
+        copy.setTrustAll(isTrustAll());
+        copy.setVerifyHost(isVerifyHost());
 
         return copy;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
new file mode 100644
index 0000000..e216c2d
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import io.netty.handler.ssl.SslHandler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static class that provides various utility methods used by Transport implementations.
+ */
+public class TransportSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransportSupport.class);
+
+    /**
+     * Creates a Netty SslHandler instance for use in Transports that require
+     * an SSL encoder / decoder.
+     *
+     * @param options
+     *        The SSL options object to build the SslHandler instance from.
+     *
+     * @return a new SslHandler that is configured from the given options.
+     *
+     * @throws Exception if an error occurs while creating the SslHandler instance.
+     */
+    public static SslHandler createSslHandler(TransportSslOptions options) throws Exception {
+        return new SslHandler(createSslEngine(createSslContext(options), options));
+    }
+
+    public static SSLContext createSslContext(TransportSslOptions options) throws Exception {
+        try {
+            SSLContext context = SSLContext.getInstance("TLS");
+            KeyManager[] keyMgrs = loadKeyManagers(options);
+
+            TrustManager[] trustManagers;
+            if (options.isTrustAll()) {
+                trustManagers = new TrustManager[] { createTrustAllTrustManager() };
+            } else {
+                trustManagers = loadTrustManagers(options);
+            }
+
+            context.init(keyMgrs, trustManagers, new SecureRandom());
+            return context;
+        } catch (Exception e) {
+            LOG.error("Failed to create SSLContext: {}", e, e);
+            throw e;
+        }
+    }
+
+    public static SSLEngine createSslEngine(SSLContext context, TransportSslOptions options) throws Exception {
+        SSLEngine engine = context.createSSLEngine();
+        engine.setEnabledProtocols(options.getEnabledProtocols());
+        engine.setUseClientMode(true);
+
+        if (options.isVerifyHost()) {
+            SSLParameters sslParameters = engine.getSSLParameters();
+            sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+            engine.setSSLParameters(sslParameters);
+        }
+
+        return engine;
+    }
+
+    private static TrustManager[] loadTrustManagers(TransportSslOptions options) throws Exception {
+        if (options.getTrustStoreLocation() == null) {
+            return null;
+        }
+
+        TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+
+        String storeLocation = options.getKeyStoreLocation();
+        String storePassword = options.getKeyStorePassword();
+        String storeType = options.getStoreType();
+
+        LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+        KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
+        fact.init(trustStore);
+
+        return fact.getTrustManagers();
+    }
+
+    private static KeyManager[] loadKeyManagers(TransportSslOptions options) throws Exception {
+        if (options.getKeyStoreLocation() == null) {
+            return null;
+        }
+
+        KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+
+        String storeLocation = options.getKeyStoreLocation();
+        String storePassword = options.getKeyStorePassword();
+        String storeType = options.getStoreType();
+
+        LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+        KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
+        fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
+
+        return fact.getKeyManagers();
+    }
+
+    private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
+        KeyStore store = KeyStore.getInstance(storeType);
+        try (InputStream in = new FileInputStream(new File(storePath));) {
+            store.load(in, password != null ? password.toCharArray() : null);
+        }
+
+        return store;
+    }
+
+    private static TrustManager createTrustAllTrustManager() {
+        return new X509TrustManager() {
+            @Override
+            public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            }
+
+            @Override
+            public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            }
+
+            @Override
+            public X509Certificate[] getAcceptedIssuers() {
+                return new X509Certificate[0];
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
index 3a959dc..46d1a94 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
@@ -16,10 +16,14 @@
  */
 package org.apache.qpid.jms.transports.netty;
 
+import io.netty.channel.Channel;
+
 import java.net.URI;
 
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportSslOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
 
 /**
  * Extends the Netty based TCP transport to add SSL support.
@@ -52,4 +56,14 @@ public class NettySslTransport extends NettyTcpTransport {
         super(listener, remoteLocation, options);
     }
 
+    @Override
+    protected void configureChannel(Channel channel) throws Exception {
+        channel.pipeline().addLast(TransportSupport.createSslHandler(getSslOptions()));
+
+        super.configureChannel(channel);
+    }
+
+    private TransportSslOptions getSslOptions() {
+        return (TransportSslOptions) options;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index f881481..a958e45 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -48,12 +48,12 @@ public class NettyTcpTransport implements Transport {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
 
-    private Bootstrap bootstrap;
-    private EventLoopGroup group;
-    private Channel channel;
-    private TransportListener listener;
-    private TransportOptions options;
-    private final URI remote;
+    protected Bootstrap bootstrap;
+    protected EventLoopGroup group;
+    protected Channel channel;
+    protected TransportListener listener;
+    protected TransportOptions options;
+    protected final URI remote;
 
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
@@ -193,7 +193,7 @@ public class NettyTcpTransport implements Transport {
         }
     }
 
-    protected void configureChannel(Channel channel) {
+    protected void configureChannel(Channel channel) throws Exception {
         channel.pipeline().addLast(new NettyTcpTransportHandler());
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
new file mode 100644
index 0000000..ecc51e5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Test for class TransportOptions
+ */
+public class TransportOptionsTest extends QpidJmsTestCase {
+
+    public static final int TEST_SEND_BUFFER_SIZE = 128 * 1024;
+    public static final int TEST_RECEIVE_BUFFER_SIZE = TEST_SEND_BUFFER_SIZE;
+    public static final int TEST_TRAFFIC_CLASS = 1;
+    public static final boolean TEST_TCP_NO_DELAY = false;
+    public static final boolean TEST_TCP_KEEP_ALIVE = true;
+    public static final int TEST_SO_LINGER = Short.MAX_VALUE;
+    public static final int TEST_SO_TIMEOUT = 10;
+    public static final int TEST_CONNECT_TIMEOUT = 90000;
+
+    @Test
+    public void testCreate() {
+        TransportOptions options = new TransportOptions();
+
+        assertEquals(TransportOptions.DEFAULT_TCP_NO_DELAY, options.isTcpNoDelay());
+    }
+
+    @Test
+    public void testOptions() {
+        TransportOptions options = createNonDefaultOptions();
+
+        assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+        assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+        assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+        assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+        assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+        assertEquals(TEST_SO_LINGER, options.getSoLinger());
+        assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+        assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+    }
+
+    @Test
+    public void testClone() {
+        TransportOptions options = createNonDefaultOptions().clone();
+
+        assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+        assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+        assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+        assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+        assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+        assertEquals(TEST_SO_LINGER, options.getSoLinger());
+        assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+        assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+    }
+
+    private TransportOptions createNonDefaultOptions() {
+        TransportOptions options = new TransportOptions();
+
+        options.setSendBufferSize(TEST_SEND_BUFFER_SIZE);
+        options.setReceiveBufferSize(TEST_RECEIVE_BUFFER_SIZE);
+        options.setTrafficClass(TEST_TRAFFIC_CLASS);
+        options.setTcpNoDelay(TEST_TCP_NO_DELAY);
+        options.setTcpKeepAlive(TEST_TCP_KEEP_ALIVE);
+        options.setSoLinger(TEST_SO_LINGER);
+        options.setSoTimeout(TEST_SO_TIMEOUT);
+        options.setConnectTimeout(TEST_CONNECT_TIMEOUT);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
new file mode 100644
index 0000000..d58fde8
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Test for class TransportSslOptions
+ */
+public class TransportSslOptionsTest extends QpidJmsTestCase {
+
+    public static final String PASSWORD = "password";
+    public static final String CLINET_KEYSTORE = "src/test/resources/client-jks.keystore";
+    public static final String CLINET_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+    public static final String KEYSTORE_TYPE = "jks";
+    public static final boolean TRUST_ALL = true;
+    public static final boolean VERIFY_HOST = true;
+
+    public static final int TEST_SEND_BUFFER_SIZE = 128 * 1024;
+    public static final int TEST_RECEIVE_BUFFER_SIZE = TEST_SEND_BUFFER_SIZE;
+    public static final int TEST_TRAFFIC_CLASS = 1;
+    public static final boolean TEST_TCP_NO_DELAY = false;
+    public static final boolean TEST_TCP_KEEP_ALIVE = true;
+    public static final int TEST_SO_LINGER = Short.MAX_VALUE;
+    public static final int TEST_SO_TIMEOUT = 10;
+    public static final int TEST_CONNECT_TIMEOUT = 90000;
+
+    @Test
+    public void testCreate() {
+        TransportSslOptions options = new TransportSslOptions();
+
+        assertEquals(TransportSslOptions.DEFAULT_TRUST_ALL, options.isTrustAll());
+        assertEquals(TransportSslOptions.DEFAULT_STORE_TYPE, options.getStoreType());
+
+        assertNull(options.getKeyStoreLocation());
+        assertNull(options.getKeyStorePassword());
+        assertNull(options.getTrustStoreLocation());
+        assertNull(options.getTrustStorePassword());
+    }
+
+    @Test
+    public void testCreateAndConfigure() {
+        TransportSslOptions options = createSslOptions();
+
+        assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+        assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+        assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+        assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+        assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+        assertEquals(TEST_SO_LINGER, options.getSoLinger());
+        assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+        assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+
+        assertEquals(CLINET_KEYSTORE, options.getKeyStoreLocation());
+        assertEquals(PASSWORD, options.getKeyStorePassword());
+        assertEquals(CLINET_TRUSTSTORE, options.getTrustStoreLocation());
+        assertEquals(PASSWORD, options.getTrustStorePassword());
+        assertEquals(KEYSTORE_TYPE, options.getStoreType());
+    }
+
+    @Test
+    public void testClone() {
+        TransportSslOptions options = createSslOptions().clone();
+
+        assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+        assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+        assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+        assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+        assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+        assertEquals(TEST_SO_LINGER, options.getSoLinger());
+        assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+        assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+
+        assertEquals(CLINET_KEYSTORE, options.getKeyStoreLocation());
+        assertEquals(PASSWORD, options.getKeyStorePassword());
+        assertEquals(CLINET_TRUSTSTORE, options.getTrustStoreLocation());
+        assertEquals(PASSWORD, options.getTrustStorePassword());
+        assertEquals(KEYSTORE_TYPE, options.getStoreType());
+    }
+
+    private TransportSslOptions createSslOptions() {
+        TransportSslOptions options = new TransportSslOptions();
+
+        options.setKeyStoreLocation(CLINET_KEYSTORE);
+        options.setTrustStoreLocation(CLINET_TRUSTSTORE);
+        options.setKeyStorePassword(PASSWORD);
+        options.setTrustStorePassword(PASSWORD);
+        options.setStoreType(KEYSTORE_TYPE);
+        options.setTrustAll(TRUST_ALL);
+        options.setVerifyHost(VERIFY_HOST);
+
+        options.setSendBufferSize(TEST_SEND_BUFFER_SIZE);
+        options.setReceiveBufferSize(TEST_RECEIVE_BUFFER_SIZE);
+        options.setTrafficClass(TEST_TRAFFIC_CLASS);
+        options.setTcpNoDelay(TEST_TCP_NO_DELAY);
+        options.setTcpKeepAlive(TEST_TCP_KEEP_ALIVE);
+        options.setSoLinger(TEST_SO_LINGER);
+        options.setSoTimeout(TEST_SO_TIMEOUT);
+        options.setConnectTimeout(TEST_CONNECT_TIMEOUT);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
new file mode 100644
index 0000000..1f26e32
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Tests for the TransmportSupport class.
+ */
+public class TransportSupportTest extends QpidJmsTestCase {
+
+    public static final String PASSWORD = "password";
+    public static final String BROKER_KEYSTORE = "src/test/resources/broker-jks.keystore";
+    public static final String BROKER_TRUSTSTORE = "src/test/resources/broker-jks.truststore";
+    public static final String CLINET_KEYSTORE = "src/test/resources/client-jks.keystore";
+    public static final String CLINET_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+    public static final String KEYSTORE_TYPE = "jks";
+
+    @Test
+    public void testCreateSslContext() throws Exception {
+        TransportSslOptions options = createSslOptions();
+
+        SSLContext context = TransportSupport.createSslContext(options);
+        assertNotNull(context);
+
+        assertEquals("TLS", context.getProtocol());
+    }
+
+    @Test
+    public void testCreateSslEngine() throws Exception {
+        TransportSslOptions options = createSslOptions();
+
+        SSLContext context = TransportSupport.createSslContext(options);
+        assertNotNull(context);
+
+        SSLEngine engine = TransportSupport.createSslEngine(context, options);
+        assertNotNull(engine);
+
+        List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
+        List<String> defaultProtocols = Arrays.asList(TransportSslOptions.DEFAULT_ENABLED_PROTOCOLS);
+
+        assertThat(engineProtocols, containsInAnyOrder(defaultProtocols.toArray()));
+    }
+
+    private TransportSslOptions createSslOptions() {
+        TransportSslOptions options = new TransportSslOptions();
+
+        options.setKeyStoreLocation(CLINET_KEYSTORE);
+        options.setTrustStoreLocation(CLINET_TRUSTSTORE);
+        options.setKeyStorePassword(PASSWORD);
+        options.setTrustStorePassword(PASSWORD);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/broker-jks.keystore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/broker-jks.keystore b/qpid-jms-client/src/test/resources/broker-jks.keystore
new file mode 100644
index 0000000..e23f1a9
Binary files /dev/null and b/qpid-jms-client/src/test/resources/broker-jks.keystore differ

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/broker-jks.truststore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/broker-jks.truststore b/qpid-jms-client/src/test/resources/broker-jks.truststore
new file mode 100644
index 0000000..b339423
Binary files /dev/null and b/qpid-jms-client/src/test/resources/broker-jks.truststore differ

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/client-jks.keystore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/client-jks.keystore b/qpid-jms-client/src/test/resources/client-jks.keystore
new file mode 100644
index 0000000..e0474af
Binary files /dev/null and b/qpid-jms-client/src/test/resources/client-jks.keystore differ

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/client-jks.truststore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/client-jks.truststore b/qpid-jms-client/src/test/resources/client-jks.truststore
new file mode 100644
index 0000000..f9ab34f
Binary files /dev/null and b/qpid-jms-client/src/test/resources/client-jks.truststore differ


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org