You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/22 23:52:28 UTC
[1/2] qpid-jms git commit: move these to a matching test package
Repository: qpid-jms
Updated Branches:
refs/heads/master 3d31992d5 -> 9d2ed0d66
move these to a matching test package
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/10981041
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/10981041
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/10981041
Branch: refs/heads/master
Commit: 10981041d41b2d7455813f90008d5f8efb98205b
Parents: 3d31992
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 22 15:12:00 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 22 15:12:00 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/test/netty/NettyEchoServer.java | 155 --------
.../jms/test/netty/NettyTcpTransportTest.java | 353 -------------------
.../jms/transports/netty/NettyEchoServer.java | 155 ++++++++
.../transports/netty/NettyTcpTransportTest.java | 353 +++++++++++++++++++
4 files changed, 508 insertions(+), 508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
deleted file mode 100644
index 42bc535..0000000
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import io.netty.bootstrap.ServerBootstrap;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.logging.LogLevel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.util.concurrent.Future;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.net.ServerSocketFactory;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Simple Netty Server used to echo all data.
- */
-public class NettyEchoServer implements AutoCloseable {
- private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class);
-
- static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
- static final int TIMEOUT = 5000;
-
- private EventLoopGroup bossGroup;
- private EventLoopGroup workerGroup;
- private Channel serverChannel;
- private int serverPort;
-
- private final AtomicBoolean started = new AtomicBoolean();
-
- public void start() throws Exception {
-
- if (started.compareAndSet(false, true)) {
-
- // Configure the server.
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = new NioEventLoopGroup();
-
- ServerBootstrap server = new ServerBootstrap();
- server.group(bossGroup, workerGroup);
- server.channel(NioServerSocketChannel.class);
- server.option(ChannelOption.SO_BACKLOG, 100);
- server.handler(new LoggingHandler(LogLevel.INFO));
- server.childHandler(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(Channel ch) throws Exception {
- ch.pipeline().addLast(new EchoServerHandler());
- }
- });
-
- // Start the server.
- serverChannel = server.bind(getServerPort()).sync().channel();
- }
- }
-
- public void stop() throws InterruptedException {
- if (started.compareAndSet(true, false)) {
- try {
- LOG.info("Syncing channel close");
- serverChannel.close().sync();
- } catch (InterruptedException e) {
- }
- // Shut down all event loops to terminate all threads.
- LOG.info("Shutting down boss group");
- Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
- LOG.info("Shutting down worker group");
- Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
-
- LOG.info("Awaiting boss group shutdown");
- boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
-
- LOG.info("Awaiting worker group shutdown");
- boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
-
- if (!bossShutdown) {
- throw new RuntimeException("Failed to shut down bossGroup in allotted time");
- }
- if (!workerShutdown) {
- throw new RuntimeException("Failed to shut down workerGroup in allotted time");
- }
- }
- }
-
- @Override
- public void close() throws InterruptedException {
- stop();
- }
-
- public int getServerPort() {
- if (serverPort == 0) {
- ServerSocket ss = null;
- try {
- ss = ServerSocketFactory.getDefault().createServerSocket(0);
- serverPort = ss.getLocalPort();
- } catch (IOException e) { // revert back to default
- serverPort = PORT;
- } finally {
- try {
- if (ss != null ) {
- ss.close();
- }
- } catch (IOException e) { // ignore
- }
- }
- }
- return serverPort;
- }
-
- private class EchoServerHandler extends ChannelInboundHandlerAdapter {
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- ctx.write(msg);
- }
-
- @Override
- public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.flush();
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- // Close the connection when an exception is raised.
- cause.printStackTrace();
- ctx.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
deleted file mode 100644
index 448b1be..0000000
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.test.netty;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.qpid.jms.test.QpidJmsTestCase;
-import org.apache.qpid.jms.test.Wait;
-import org.apache.qpid.jms.transports.TransportListener;
-import org.apache.qpid.jms.transports.TransportOptions;
-import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test basic functionality of the Netty based TCP transport.
- */
-public class NettyTcpTransportTest extends QpidJmsTestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
-
- private static final int SEND_BYTE_COUNT = 1024;
-
- private boolean transportClosed;
- private final List<Throwable> exceptions = new ArrayList<Throwable>();
- private final List<ByteBuf> data = new ArrayList<ByteBuf>();
- private final AtomicInteger bytesRead = new AtomicInteger();
-
- private final TransportListener testListener = new NettyTransportListener();
- private final TransportOptions testOptions = new TransportOptions();
-
- @Test(timeout = 60 * 1000)
- public void testConnectToServer() throws Exception {
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- LOG.info("Connected to test server.");
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
-
- assertTrue(transport.isConnected());
-
- transport.close();
- }
-
- assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
- assertTrue(exceptions.isEmpty());
- assertTrue(data.isEmpty());
- }
-
- @Test(timeout = 60 * 1000)
- public void testMultipleConnectionsToServer() throws Exception {
- final int CONNECTION_COUNT = 25;
-
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
-
- for (int i = 0; i < CONNECTION_COUNT; ++i) {
- NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- assertTrue(transport.isConnected());
- LOG.info("Connected to test server.");
- transports.add(transport);
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
- }
-
- for (NettyTcpTransport transport : transports) {
- transport.close();
- }
- }
-
- assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
- assertTrue(exceptions.isEmpty());
- assertTrue(data.isEmpty());
- }
-
- @Test(timeout = 60 * 1000)
- public void testMultipleConnectionsSendReceive() throws Exception {
- final int CONNECTION_COUNT = 25;
- final int FRAME_SIZE = 8;
-
- ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
- for (int i = 0; i < 8; ++i) {
- sendBuffer.writeByte('A');
- }
-
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
-
- for (int i = 0; i < CONNECTION_COUNT; ++i) {
- NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- transport.send(sendBuffer.copy());
- transports.add(transport);
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
- }
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
- return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
- }
- }));
-
- for (NettyTcpTransport transport : transports) {
- transport.close();
- }
- }
-
- assertTrue(exceptions.isEmpty());
- }
-
- @Test(timeout = 60 * 1000)
- public void testDetectServerClose() throws Exception {
- NettyTcpTransport transport = null;
-
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- LOG.info("Connected to test server.");
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
-
- assertTrue(transport.isConnected());
-
- server.close();
- }
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return transportClosed;
- }
- }));
- assertTrue(exceptions.isEmpty());
- assertTrue(data.isEmpty());
- assertFalse(transport.isConnected());
-
- try {
- transport.close();
- } catch (Exception ex) {
- fail("Close of a disconnect transport should not generate errors");
- }
- }
-
- @Test(timeout = 60 * 1000)
- public void testDataSentIsReceived() throws Exception {
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
-
- NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- LOG.info("Connected to test server.");
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
-
- assertTrue(transport.isConnected());
-
- ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
- for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
- sendBuffer.writeByte('A');
- }
-
- transport.send(sendBuffer);
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return !data.isEmpty();
- }
- }));
-
- assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
-
- transport.close();
- }
-
- assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
- assertTrue(exceptions.isEmpty());
- }
-
-
- @Test(timeout = 60 * 1000)
- public void testMultipleDataPacketsSentAreReceived() throws Exception {
- doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
- }
-
- @Test(timeout = 60 * 1000)
- public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception {
- doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
- }
-
- public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception {
-
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- LOG.info("Connected to test server.");
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
-
- assertTrue(transport.isConnected());
-
- ByteBuf sendBuffer = Unpooled.buffer(byteCount);
- for (int i = 0; i < byteCount; ++i) {
- sendBuffer.writeByte('A');
- }
-
- for (int i = 0; i < iterations; ++i) {
- transport.send(sendBuffer.copy());
- }
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return bytesRead.get() == (byteCount * iterations);
- }
- }));
-
- transport.close();
- }
-
- assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
- assertTrue(exceptions.isEmpty());
- }
-
- @Test(timeout = 60 * 1000)
- public void testSendToClosedTransportFails() throws Exception {
- NettyTcpTransport transport = null;
-
- try (NettyEchoServer server = new NettyEchoServer()) {
- server.start();
-
- int port = server.getServerPort();
- URI serverLocation = new URI("tcp://localhost:" + port);
-
- transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
- try {
- transport.connect();
- LOG.info("Connected to test server.");
- } catch (Exception e) {
- fail("Should have connected to the server");
- }
-
- assertTrue(transport.isConnected());
-
- transport.close();
-
- ByteBuf sendBuffer = Unpooled.buffer(10);
- try {
- transport.send(sendBuffer);
- fail("Should throw on send of closed transport");
- } catch (IOException ex) {
- }
- }
- }
-
- private class NettyTransportListener implements TransportListener {
-
- @Override
- public void onData(ByteBuf incoming) {
- LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes());
- data.add(incoming);
- bytesRead.addAndGet(incoming.readableBytes());
- }
-
- @Override
- public void onTransportClosed() {
- LOG.debug("Transport reports that it has closed.");
- transportClosed = true;
- }
-
- @Override
- public void onTransportError(Throwable cause) {
- LOG.debug("Transport error caught: {}", cause.getMessage());
- exceptions.add(cause);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
new file mode 100644
index 0000000..592ed1d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyEchoServer.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.util.concurrent.Future;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple Netty Server used to echo all data.
+ */
+public class NettyEchoServer implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(NettyEchoServer.class);
+
+ static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
+ static final int TIMEOUT = 5000;
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private Channel serverChannel;
+ private int serverPort;
+
+ private final AtomicBoolean started = new AtomicBoolean();
+
+ public void start() throws Exception {
+
+ if (started.compareAndSet(false, true)) {
+
+ // Configure the server.
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+
+ ServerBootstrap server = new ServerBootstrap();
+ server.group(bossGroup, workerGroup);
+ server.channel(NioServerSocketChannel.class);
+ server.option(ChannelOption.SO_BACKLOG, 100);
+ server.handler(new LoggingHandler(LogLevel.INFO));
+ server.childHandler(new ChannelInitializer<Channel>() {
+ @Override
+ public void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(new EchoServerHandler());
+ }
+ });
+
+ // Start the server.
+ serverChannel = server.bind(getServerPort()).sync().channel();
+ }
+ }
+
+ public void stop() throws InterruptedException {
+ if (started.compareAndSet(true, false)) {
+ try {
+ LOG.info("Syncing channel close");
+ serverChannel.close().sync();
+ } catch (InterruptedException e) {
+ }
+ // Shut down all event loops to terminate all threads.
+ LOG.info("Shutting down boss group");
+ Future<?> bossFuture = bossGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
+ LOG.info("Shutting down worker group");
+ Future<?> workerFuture = workerGroup.shutdownGracefully(10, TIMEOUT, TimeUnit.MILLISECONDS);
+
+ LOG.info("Awaiting boss group shutdown");
+ boolean bossShutdown = bossFuture.await(TIMEOUT + 500);
+
+ LOG.info("Awaiting worker group shutdown");
+ boolean workerShutdown = workerFuture.await(TIMEOUT + 500);
+
+ if (!bossShutdown) {
+ throw new RuntimeException("Failed to shut down bossGroup in allotted time");
+ }
+ if (!workerShutdown) {
+ throw new RuntimeException("Failed to shut down workerGroup in allotted time");
+ }
+ }
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ stop();
+ }
+
+ public int getServerPort() {
+ if (serverPort == 0) {
+ ServerSocket ss = null;
+ try {
+ ss = ServerSocketFactory.getDefault().createServerSocket(0);
+ serverPort = ss.getLocalPort();
+ } catch (IOException e) { // revert back to default
+ serverPort = PORT;
+ } finally {
+ try {
+ if (ss != null ) {
+ ss.close();
+ }
+ } catch (IOException e) { // ignore
+ }
+ }
+ }
+ return serverPort;
+ }
+
+ private class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.write(msg);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ // Close the connection when an exception is raised.
+ cause.printStackTrace();
+ ctx.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/10981041/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
new file mode 100644
index 0000000..e65a1b7
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports.netty;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.netty.NettyTcpTransport;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test basic functionality of the Netty based TCP transport.
+ */
+public class NettyTcpTransportTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
+
+ private static final int SEND_BYTE_COUNT = 1024;
+
+ private boolean transportClosed;
+ private final List<Throwable> exceptions = new ArrayList<Throwable>();
+ private final List<ByteBuf> data = new ArrayList<ByteBuf>();
+ private final AtomicInteger bytesRead = new AtomicInteger();
+
+ private final TransportListener testListener = new NettyTransportListener();
+ private final TransportOptions testOptions = new TransportOptions();
+
+ @Test(timeout = 60 * 1000)
+ public void testConnectToServer() throws Exception {
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testMultipleConnectionsToServer() throws Exception {
+ final int CONNECTION_COUNT = 25;
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
+
+ for (int i = 0; i < CONNECTION_COUNT; ++i) {
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ assertTrue(transport.isConnected());
+ LOG.info("Connected to test server.");
+ transports.add(transport);
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+ }
+
+ for (NettyTcpTransport transport : transports) {
+ transport.close();
+ }
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testMultipleConnectionsSendReceive() throws Exception {
+ final int CONNECTION_COUNT = 25;
+ final int FRAME_SIZE = 8;
+
+ ByteBuf sendBuffer = Unpooled.buffer(FRAME_SIZE);
+ for (int i = 0; i < 8; ++i) {
+ sendBuffer.writeByte('A');
+ }
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ List<NettyTcpTransport> transports = new ArrayList<NettyTcpTransport>();
+
+ for (int i = 0; i < CONNECTION_COUNT; ++i) {
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ transport.send(sendBuffer.copy());
+ transports.add(transport);
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.debug("Checking completion: read {} expecting {}", bytesRead.get(), (FRAME_SIZE * CONNECTION_COUNT));
+ return bytesRead.get() == (FRAME_SIZE * CONNECTION_COUNT);
+ }
+ }));
+
+ for (NettyTcpTransport transport : transports) {
+ transport.close();
+ }
+ }
+
+ assertTrue(exceptions.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDetectServerClose() throws Exception {
+ NettyTcpTransport transport = null;
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ server.close();
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return transportClosed;
+ }
+ }));
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ assertFalse(transport.isConnected());
+
+ try {
+ transport.close();
+ } catch (Exception ex) {
+ fail("Close of a disconnect transport should not generate errors");
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDataSentIsReceived() throws Exception {
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
+ for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
+ sendBuffer.writeByte('A');
+ }
+
+ transport.send(sendBuffer);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !data.isEmpty();
+ }
+ }));
+
+ assertEquals(SEND_BYTE_COUNT, data.get(0).readableBytes());
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ }
+
+
+ @Test(timeout = 60 * 1000)
+ public void testMultipleDataPacketsSentAreReceived() throws Exception {
+ doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 1);
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testMultipleDataPacketsSentAreReceivedRepeatedly() throws Exception {
+ doMultipleDataPacketsSentAndReceive(SEND_BYTE_COUNT, 10);
+ }
+
+ public void doMultipleDataPacketsSentAndReceive(final int byteCount, final int iterations) throws Exception {
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ ByteBuf sendBuffer = Unpooled.buffer(byteCount);
+ for (int i = 0; i < byteCount; ++i) {
+ sendBuffer.writeByte('A');
+ }
+
+ for (int i = 0; i < iterations; ++i) {
+ transport.send(sendBuffer.copy());
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return bytesRead.get() == (byteCount * iterations);
+ }
+ }));
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testSendToClosedTransportFails() throws Exception {
+ NettyTcpTransport transport = null;
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ transport.close();
+
+ ByteBuf sendBuffer = Unpooled.buffer(10);
+ try {
+ transport.send(sendBuffer);
+ fail("Should throw on send of closed transport");
+ } catch (IOException ex) {
+ }
+ }
+ }
+
+ private class NettyTransportListener implements TransportListener {
+
+ @Override
+ public void onData(ByteBuf incoming) {
+ LOG.debug("Client has new incoming data of size: {}", incoming.readableBytes());
+ data.add(incoming);
+ bytesRead.addAndGet(incoming.readableBytes());
+ }
+
+ @Override
+ public void onTransportClosed() {
+ LOG.debug("Transport reports that it has closed.");
+ transportClosed = true;
+ }
+
+ @Override
+ public void onTransportError(Throwable cause) {
+ LOG.debug("Transport error caught: {}", cause.getMessage());
+ exceptions.add(cause);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: Add some more framework to support ssl
transports over Netty.
Posted by ta...@apache.org.
Add some more framework to support ssl transports over Netty.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9d2ed0d6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9d2ed0d6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9d2ed0d6
Branch: refs/heads/master
Commit: 9d2ed0d668b38245f02282279d0e1c223bf65610
Parents: 1098104
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Jan 22 17:38:47 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jan 22 17:38:47 2015 -0500
----------------------------------------------------------------------
.../jms/transports/TransportSslOptions.java | 40 ++++-
.../qpid/jms/transports/TransportSupport.java | 160 +++++++++++++++++++
.../jms/transports/netty/NettySslTransport.java | 14 ++
.../jms/transports/netty/NettyTcpTransport.java | 14 +-
.../jms/transports/TransportOptionsTest.java | 87 ++++++++++
.../jms/transports/TransportSslOptionsTest.java | 121 ++++++++++++++
.../jms/transports/TransportSupportTest.java | 81 ++++++++++
.../src/test/resources/broker-jks.keystore | Bin 0 -> 2102 bytes
.../src/test/resources/broker-jks.truststore | Bin 0 -> 1589 bytes
.../src/test/resources/client-jks.keystore | Bin 0 -> 2097 bytes
.../src/test/resources/client-jks.truststore | Bin 0 -> 1592 bytes
11 files changed, 508 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
index f169c78..17bf78d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSslOptions.java
@@ -16,6 +16,7 @@
*/
package org.apache.qpid.jms.transports;
+
/**
* Holds the defined SSL options for connections that operate over a secure
* transport. Options are read from the environment and can be overridden by
@@ -23,8 +24,10 @@ package org.apache.qpid.jms.transports;
*/
public class TransportSslOptions extends TransportOptions {
- private static final String[] DEFAULT_ENABLED_PROTOCOLS = {"SSLv2Hello", "TLSv1", "TLSv1.1", "TLSv1.2"};
- private static final String DEFAULT_STORE_TYPE = "JKS";
+ public static final String[] DEFAULT_ENABLED_PROTOCOLS = {"SSLv2Hello", "TLSv1", "TLSv1.1", "TLSv1.2"};
+ public static final String DEFAULT_STORE_TYPE = "jks";
+ public static final boolean DEFAULT_TRUST_ALL = false;
+ public static final boolean DEFAULT_VERIFY_HOST = false;
public static final TransportSslOptions INSTANCE = new TransportSslOptions();
@@ -36,6 +39,9 @@ public class TransportSslOptions extends TransportOptions {
private String[] enabledCipherSuites;
private String[] enabledProtocols = DEFAULT_ENABLED_PROTOCOLS;
+ private boolean trustAll = DEFAULT_TRUST_ALL;
+ private boolean verifyHost = DEFAULT_VERIFY_HOST;
+
static {
INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
@@ -145,6 +151,34 @@ public class TransportSslOptions extends TransportOptions {
this.enabledProtocols = enabledProtocols;
}
+ /**
+ * @return the trustAll
+ */
+ public boolean isTrustAll() {
+ return trustAll;
+ }
+
+ /**
+ * @param trustAll the trustAll to set
+ */
+ public void setTrustAll(boolean trustAll) {
+ this.trustAll = trustAll;
+ }
+
+ /**
+ * @return the verifyHost
+ */
+ public boolean isVerifyHost() {
+ return verifyHost;
+ }
+
+ /**
+ * @param verifyHost the verifyHost to set
+ */
+ public void setVerifyHost(boolean verifyHost) {
+ this.verifyHost = verifyHost;
+ }
+
@Override
public TransportSslOptions clone() {
return copyOptions(new TransportSslOptions());
@@ -160,6 +194,8 @@ public class TransportSslOptions extends TransportOptions {
copy.setStoreType(getStoreType());
copy.setEnabledCipherSuites(getEnabledCipherSuites());
copy.setEnabledProtocols(getEnabledProtocols());
+ copy.setTrustAll(isTrustAll());
+ copy.setVerifyHost(isVerifyHost());
return copy;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
new file mode 100644
index 0000000..e216c2d
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportSupport.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import io.netty.handler.ssl.SslHandler;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static class that provides various utility methods used by Transport implementations.
+ */
+public class TransportSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TransportSupport.class);
+
+ /**
+ * Creates a Netty SslHandler instance for use in Transports that require
+ * an SSL encoder / decoder.
+ *
+ * @param options
+ * The SSL options object to build the SslHandler instance from.
+ *
+ * @return a new SslHandler that is configured from the given options.
+ *
+ * @throws Exception if an error occurs while creating the SslHandler instance.
+ */
+ public static SslHandler createSslHandler(TransportSslOptions options) throws Exception {
+ return new SslHandler(createSslEngine(createSslContext(options), options));
+ }
+
+ public static SSLContext createSslContext(TransportSslOptions options) throws Exception {
+ try {
+ SSLContext context = SSLContext.getInstance("TLS");
+ KeyManager[] keyMgrs = loadKeyManagers(options);
+
+ TrustManager[] trustManagers;
+ if (options.isTrustAll()) {
+ trustManagers = new TrustManager[] { createTrustAllTrustManager() };
+ } else {
+ trustManagers = loadTrustManagers(options);
+ }
+
+ context.init(keyMgrs, trustManagers, new SecureRandom());
+ return context;
+ } catch (Exception e) {
+ LOG.error("Failed to create SSLContext: {}", e, e);
+ throw e;
+ }
+ }
+
+ public static SSLEngine createSslEngine(SSLContext context, TransportSslOptions options) throws Exception {
+ SSLEngine engine = context.createSSLEngine();
+ engine.setEnabledProtocols(options.getEnabledProtocols());
+ engine.setUseClientMode(true);
+
+ if (options.isVerifyHost()) {
+ SSLParameters sslParameters = engine.getSSLParameters();
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ engine.setSSLParameters(sslParameters);
+ }
+
+ return engine;
+ }
+
+ private static TrustManager[] loadTrustManagers(TransportSslOptions options) throws Exception {
+ if (options.getTrustStoreLocation() == null) {
+ return null;
+ }
+
+ TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+
+ String storeLocation = options.getKeyStoreLocation();
+ String storePassword = options.getKeyStorePassword();
+ String storeType = options.getStoreType();
+
+ LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+ KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
+ fact.init(trustStore);
+
+ return fact.getTrustManagers();
+ }
+
+ private static KeyManager[] loadKeyManagers(TransportSslOptions options) throws Exception {
+ if (options.getKeyStoreLocation() == null) {
+ return null;
+ }
+
+ KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+
+ String storeLocation = options.getKeyStoreLocation();
+ String storePassword = options.getKeyStorePassword();
+ String storeType = options.getStoreType();
+
+ LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+ KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
+ fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
+
+ return fact.getKeyManagers();
+ }
+
+ private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
+ KeyStore store = KeyStore.getInstance(storeType);
+ try (InputStream in = new FileInputStream(new File(storePath));) {
+ store.load(in, password != null ? password.toCharArray() : null);
+ }
+
+ return store;
+ }
+
+ private static TrustManager createTrustAllTrustManager() {
+ return new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
index 3a959dc..46d1a94 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransport.java
@@ -16,10 +16,14 @@
*/
package org.apache.qpid.jms.transports.netty;
+import io.netty.channel.Channel;
+
import java.net.URI;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.transports.TransportSslOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
/**
* Extends the Netty based TCP transport to add SSL support.
@@ -52,4 +56,14 @@ public class NettySslTransport extends NettyTcpTransport {
super(listener, remoteLocation, options);
}
+ @Override
+ protected void configureChannel(Channel channel) throws Exception {
+ channel.pipeline().addLast(TransportSupport.createSslHandler(getSslOptions()));
+
+ super.configureChannel(channel);
+ }
+
+ private TransportSslOptions getSslOptions() {
+ return (TransportSslOptions) options;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index f881481..a958e45 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -48,12 +48,12 @@ public class NettyTcpTransport implements Transport {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
- private Bootstrap bootstrap;
- private EventLoopGroup group;
- private Channel channel;
- private TransportListener listener;
- private TransportOptions options;
- private final URI remote;
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected TransportListener listener;
+ protected TransportOptions options;
+ protected final URI remote;
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
@@ -193,7 +193,7 @@ public class NettyTcpTransport implements Transport {
}
}
- protected void configureChannel(Channel channel) {
+ protected void configureChannel(Channel channel) throws Exception {
channel.pipeline().addLast(new NettyTcpTransportHandler());
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
new file mode 100644
index 0000000..ecc51e5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportOptionsTest.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Test for class TransportOptions
+ */
+public class TransportOptionsTest extends QpidJmsTestCase {
+
+ public static final int TEST_SEND_BUFFER_SIZE = 128 * 1024;
+ public static final int TEST_RECEIVE_BUFFER_SIZE = TEST_SEND_BUFFER_SIZE;
+ public static final int TEST_TRAFFIC_CLASS = 1;
+ public static final boolean TEST_TCP_NO_DELAY = false;
+ public static final boolean TEST_TCP_KEEP_ALIVE = true;
+ public static final int TEST_SO_LINGER = Short.MAX_VALUE;
+ public static final int TEST_SO_TIMEOUT = 10;
+ public static final int TEST_CONNECT_TIMEOUT = 90000;
+
+ @Test
+ public void testCreate() {
+ TransportOptions options = new TransportOptions();
+
+ assertEquals(TransportOptions.DEFAULT_TCP_NO_DELAY, options.isTcpNoDelay());
+ }
+
+ @Test
+ public void testOptions() {
+ TransportOptions options = createNonDefaultOptions();
+
+ assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+ assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+ assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+ assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+ assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+ assertEquals(TEST_SO_LINGER, options.getSoLinger());
+ assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+ assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+ }
+
+ @Test
+ public void testClone() {
+ TransportOptions options = createNonDefaultOptions().clone();
+
+ assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+ assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+ assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+ assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+ assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+ assertEquals(TEST_SO_LINGER, options.getSoLinger());
+ assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+ assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+ }
+
+ private TransportOptions createNonDefaultOptions() {
+ TransportOptions options = new TransportOptions();
+
+ options.setSendBufferSize(TEST_SEND_BUFFER_SIZE);
+ options.setReceiveBufferSize(TEST_RECEIVE_BUFFER_SIZE);
+ options.setTrafficClass(TEST_TRAFFIC_CLASS);
+ options.setTcpNoDelay(TEST_TCP_NO_DELAY);
+ options.setTcpKeepAlive(TEST_TCP_KEEP_ALIVE);
+ options.setSoLinger(TEST_SO_LINGER);
+ options.setSoTimeout(TEST_SO_TIMEOUT);
+ options.setConnectTimeout(TEST_CONNECT_TIMEOUT);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
new file mode 100644
index 0000000..d58fde8
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSslOptionsTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Test for class TransportSslOptions
+ */
+public class TransportSslOptionsTest extends QpidJmsTestCase {
+
+ public static final String PASSWORD = "password";
+ public static final String CLINET_KEYSTORE = "src/test/resources/client-jks.keystore";
+ public static final String CLINET_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+ public static final String KEYSTORE_TYPE = "jks";
+ public static final boolean TRUST_ALL = true;
+ public static final boolean VERIFY_HOST = true;
+
+ public static final int TEST_SEND_BUFFER_SIZE = 128 * 1024;
+ public static final int TEST_RECEIVE_BUFFER_SIZE = TEST_SEND_BUFFER_SIZE;
+ public static final int TEST_TRAFFIC_CLASS = 1;
+ public static final boolean TEST_TCP_NO_DELAY = false;
+ public static final boolean TEST_TCP_KEEP_ALIVE = true;
+ public static final int TEST_SO_LINGER = Short.MAX_VALUE;
+ public static final int TEST_SO_TIMEOUT = 10;
+ public static final int TEST_CONNECT_TIMEOUT = 90000;
+
+ @Test
+ public void testCreate() {
+ TransportSslOptions options = new TransportSslOptions();
+
+ assertEquals(TransportSslOptions.DEFAULT_TRUST_ALL, options.isTrustAll());
+ assertEquals(TransportSslOptions.DEFAULT_STORE_TYPE, options.getStoreType());
+
+ assertNull(options.getKeyStoreLocation());
+ assertNull(options.getKeyStorePassword());
+ assertNull(options.getTrustStoreLocation());
+ assertNull(options.getTrustStorePassword());
+ }
+
+ @Test
+ public void testCreateAndConfigure() {
+ TransportSslOptions options = createSslOptions();
+
+ assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+ assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+ assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+ assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+ assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+ assertEquals(TEST_SO_LINGER, options.getSoLinger());
+ assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+ assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+
+ assertEquals(CLINET_KEYSTORE, options.getKeyStoreLocation());
+ assertEquals(PASSWORD, options.getKeyStorePassword());
+ assertEquals(CLINET_TRUSTSTORE, options.getTrustStoreLocation());
+ assertEquals(PASSWORD, options.getTrustStorePassword());
+ assertEquals(KEYSTORE_TYPE, options.getStoreType());
+ }
+
+ @Test
+ public void testClone() {
+ TransportSslOptions options = createSslOptions().clone();
+
+ assertEquals(TEST_SEND_BUFFER_SIZE, options.getSendBufferSize());
+ assertEquals(TEST_RECEIVE_BUFFER_SIZE, options.getReceiveBufferSize());
+ assertEquals(TEST_TRAFFIC_CLASS, options.getTrafficClass());
+ assertEquals(TEST_TCP_NO_DELAY, options.isTcpNoDelay());
+ assertEquals(TEST_TCP_KEEP_ALIVE, options.isTcpKeepAlive());
+ assertEquals(TEST_SO_LINGER, options.getSoLinger());
+ assertEquals(TEST_SO_TIMEOUT, options.getSoTimeout());
+ assertEquals(TEST_CONNECT_TIMEOUT, options.getConnectTimeout());
+
+ assertEquals(CLINET_KEYSTORE, options.getKeyStoreLocation());
+ assertEquals(PASSWORD, options.getKeyStorePassword());
+ assertEquals(CLINET_TRUSTSTORE, options.getTrustStoreLocation());
+ assertEquals(PASSWORD, options.getTrustStorePassword());
+ assertEquals(KEYSTORE_TYPE, options.getStoreType());
+ }
+
+ private TransportSslOptions createSslOptions() {
+ TransportSslOptions options = new TransportSslOptions();
+
+ options.setKeyStoreLocation(CLINET_KEYSTORE);
+ options.setTrustStoreLocation(CLINET_TRUSTSTORE);
+ options.setKeyStorePassword(PASSWORD);
+ options.setTrustStorePassword(PASSWORD);
+ options.setStoreType(KEYSTORE_TYPE);
+ options.setTrustAll(TRUST_ALL);
+ options.setVerifyHost(VERIFY_HOST);
+
+ options.setSendBufferSize(TEST_SEND_BUFFER_SIZE);
+ options.setReceiveBufferSize(TEST_RECEIVE_BUFFER_SIZE);
+ options.setTrafficClass(TEST_TRAFFIC_CLASS);
+ options.setTcpNoDelay(TEST_TCP_NO_DELAY);
+ options.setTcpKeepAlive(TEST_TCP_KEEP_ALIVE);
+ options.setSoLinger(TEST_SO_LINGER);
+ options.setSoTimeout(TEST_SO_TIMEOUT);
+ options.setConnectTimeout(TEST_CONNECT_TIMEOUT);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
new file mode 100644
index 0000000..1f26e32
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/TransportSupportTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.junit.Test;
+
+/**
+ * Tests for the TransmportSupport class.
+ */
+public class TransportSupportTest extends QpidJmsTestCase {
+
+ public static final String PASSWORD = "password";
+ public static final String BROKER_KEYSTORE = "src/test/resources/broker-jks.keystore";
+ public static final String BROKER_TRUSTSTORE = "src/test/resources/broker-jks.truststore";
+ public static final String CLINET_KEYSTORE = "src/test/resources/client-jks.keystore";
+ public static final String CLINET_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+ public static final String KEYSTORE_TYPE = "jks";
+
+ @Test
+ public void testCreateSslContext() throws Exception {
+ TransportSslOptions options = createSslOptions();
+
+ SSLContext context = TransportSupport.createSslContext(options);
+ assertNotNull(context);
+
+ assertEquals("TLS", context.getProtocol());
+ }
+
+ @Test
+ public void testCreateSslEngine() throws Exception {
+ TransportSslOptions options = createSslOptions();
+
+ SSLContext context = TransportSupport.createSslContext(options);
+ assertNotNull(context);
+
+ SSLEngine engine = TransportSupport.createSslEngine(context, options);
+ assertNotNull(engine);
+
+ List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
+ List<String> defaultProtocols = Arrays.asList(TransportSslOptions.DEFAULT_ENABLED_PROTOCOLS);
+
+ assertThat(engineProtocols, containsInAnyOrder(defaultProtocols.toArray()));
+ }
+
+ private TransportSslOptions createSslOptions() {
+ TransportSslOptions options = new TransportSslOptions();
+
+ options.setKeyStoreLocation(CLINET_KEYSTORE);
+ options.setTrustStoreLocation(CLINET_TRUSTSTORE);
+ options.setKeyStorePassword(PASSWORD);
+ options.setTrustStorePassword(PASSWORD);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/broker-jks.keystore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/broker-jks.keystore b/qpid-jms-client/src/test/resources/broker-jks.keystore
new file mode 100644
index 0000000..e23f1a9
Binary files /dev/null and b/qpid-jms-client/src/test/resources/broker-jks.keystore differ
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/broker-jks.truststore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/broker-jks.truststore b/qpid-jms-client/src/test/resources/broker-jks.truststore
new file mode 100644
index 0000000..b339423
Binary files /dev/null and b/qpid-jms-client/src/test/resources/broker-jks.truststore differ
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/client-jks.keystore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/client-jks.keystore b/qpid-jms-client/src/test/resources/client-jks.keystore
new file mode 100644
index 0000000..e0474af
Binary files /dev/null and b/qpid-jms-client/src/test/resources/client-jks.keystore differ
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9d2ed0d6/qpid-jms-client/src/test/resources/client-jks.truststore
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/resources/client-jks.truststore b/qpid-jms-client/src/test/resources/client-jks.truststore
new file mode 100644
index 0000000..f9ab34f
Binary files /dev/null and b/qpid-jms-client/src/test/resources/client-jks.truststore differ
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org