You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/07 19:50:51 UTC
qpid-jms git commit: Adds some basic work on a replacement transport
which uses Netty directly instead of Vert.x also adds some tests
Repository: qpid-jms
Updated Branches:
refs/heads/master 8e3f1bd50 -> b95ac58df
Adds some basic work on a replacement transport which uses Netty
directly instead of Vert.x also adds some tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b95ac58d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b95ac58d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b95ac58d
Branch: refs/heads/master
Commit: b95ac58df115c4ba0ce57f2e9585de5571756845
Parents: 8e3f1bd
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 7 13:50:36 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 7 13:50:36 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/transports/NettyTcpTransport.java | 204 +++++++++++++++++++
.../jms/transports/TcpTransportOptions.java | 153 ++++++++++++++
.../java/org/apache/qpid/jms/test/Wait.java | 48 +++++
.../qpid/jms/test/netty/NettyEchoServer.java | 132 ++++++++++++
.../jms/test/netty/NettyTcpTransportTest.java | 185 +++++++++++++++++
5 files changed, 722 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
new file mode 100644
index 0000000..ebd385d
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements Transport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+ private Bootstrap bootstrap;
+ private EventLoopGroup group;
+ private Channel channel;
+ private TransportListener listener;
+ private final TcpTransportOptions options;
+ private final URI remote;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+
+ /**
+ * Create a new transport instance
+ *
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup();
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ channel = connectedChannel;
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+ });
+
+ configureNetty(bootstrap, options);
+
+ ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
+ future.awaitUninterruptibly();
+
+ if (future.isCancelled()) {
+ throw new IOException("Connection attempt was cancelled");
+ } else if (!future.isSuccess()) {
+ throw IOExceptionSupport.create(future.cause());
+ } else {
+ connected.set(true);
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ channel.close();
+ group.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void send(ByteBuffer output) throws IOException {
+ send(Unpooled.wrappedBuffer(output));
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ channel.write(output);
+ channel.flush();
+ }
+
+ @Override
+ public TransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(TransportListener listener) {
+ this.listener = listener;
+ }
+
+ //----- Internal implementation details ----------------------------------//
+
+ protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.info("Channel has become active! Channel is {}", context.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext context, Object inbound) throws Exception {
+ ByteBuf buffer = (ByteBuf) inbound;
+ LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
+ try {
+ listener.onData(new Buffer(buffer));
+ } finally {
+ ReferenceCountUtil.release(inbound);
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.info("Channel has gone inactive! Channel is {}", context.channel());
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.info("Exception on channel! Channel is {}", context.channel());
+ if (!closed.get()) {
+ connected.set(false);
+ listener.onTransportError(cause);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
new file mode 100644
index 0000000..e5f90c3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class TcpTransportOptions {
+
+ public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+ public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+ public static final int DEFAULT_TRAFFIC_CLASS = 0;
+ public static final boolean DEFAULT_TCP_NO_DELAY = true;
+ public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+ public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+ public static final int DEFAULT_SO_TIMEOUT = -1;
+ public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private int soTimeout = DEFAULT_SO_TIMEOUT;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+ /**
+ * @return the currently set send buffer size in bytes.
+ */
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ /**
+ * Sets the send buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param sendBufferSize
+ * the new send buffer size for the TCP Transport.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setSendBufferSize(int sendBufferSize) {
+ if (sendBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ /**
+ * @return the currently configured receive buffer size in bytes.
+ */
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ /**
+ * Sets the receive buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param receiveBufferSize
+ * the new receive buffer size for the TCP Transport.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ if (receiveBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ /**
+ * @return the currently configured traffic class value.
+ */
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ /**
+ * Sets the traffic class value used by the TCP connection, valid
+ * range is between 0 and 255.
+ *
+ * @param trafficClass
+ * the new traffic class value.
+ *
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setTrafficClass(int trafficClass) {
+ if (trafficClass < 0 || trafficClass > 255) {
+ throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+ }
+
+ this.trafficClass = trafficClass;
+ }
+
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ public void setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public int getSoLinger() {
+ return soLinger;
+ }
+
+ public void setSoLinger(int soLinger) {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpKeepAlive() {
+ return tcpKeepAlive;
+ }
+
+ public void setTcpKeepAlive(boolean keepAlive) {
+ this.tcpKeepAlive = keepAlive;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
new file mode 100644
index 0000000..e95a2a9
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test;
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+ public static final long MAX_WAIT_MILLIS = 30 * 1000;
+ public static final int SLEEP_MILLIS = 1000;
+
+ public interface Condition {
+ boolean isSatisified() throws Exception;
+ }
+
+ public static boolean waitFor(Condition condition) throws Exception {
+ return waitFor(condition, MAX_WAIT_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+ return waitFor(condition, duration, SLEEP_MILLIS);
+ }
+
+ public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception {
+
+ final long expiry = System.currentTimeMillis() + duration;
+ boolean conditionSatisified = condition.isSatisified();
+ while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+ TimeUnit.MILLISECONDS.sleep(sleepMillis);
+ conditionSatisified = condition.isSatisified();
+ }
+ return conditionSatisified;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
new file mode 100644
index 0000000..a3f5bcd
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+
+/**
+ * Simple Netty Server used to echo all data.
+ */
+public class NettyEchoServer implements AutoCloseable {
+
+ static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private Channel serverChannel;
+ private int serverPort;
+
+ private final AtomicBoolean started = new AtomicBoolean();
+
+ public void start() throws Exception {
+
+ if (started.compareAndSet(false, true)) {
+
+ // Configure the server.
+ bossGroup = new NioEventLoopGroup(1);
+ workerGroup = new NioEventLoopGroup();
+
+ ServerBootstrap server = new ServerBootstrap();
+ server.group(bossGroup, workerGroup);
+ server.channel(NioServerSocketChannel.class);
+ server.option(ChannelOption.SO_BACKLOG, 100);
+ server.handler(new LoggingHandler(LogLevel.INFO));
+ server.childHandler(new ChannelInitializer<Channel>() {
+ @Override
+ public void initChannel(Channel ch) throws Exception {
+ ch.pipeline().addLast(new EchoServerHandler());
+ }
+ });
+
+ // Start the server.
+ serverChannel = server.bind(getServerPort()).sync().channel();
+ }
+ }
+
+ public void stop() {
+ if (started.compareAndSet(true, false)) {
+ try {
+ serverChannel.close().sync();
+ } catch (InterruptedException e) {
+ }
+ // Shut down all event loops to terminate all threads.
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ public int getServerPort() {
+ if (serverPort == 0) {
+ ServerSocket ss = null;
+ try {
+ ss = ServerSocketFactory.getDefault().createServerSocket(0);
+ serverPort = ss.getLocalPort();
+ } catch (IOException e) { // revert back to default
+ serverPort = PORT;
+ } finally {
+ try {
+ if (ss != null ) {
+ ss.close();
+ }
+ } catch (IOException e) { // ignore
+ }
+ }
+ }
+ return serverPort;
+ }
+
+ private class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ ctx.write(msg);
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) {
+ ctx.flush();
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ // Close the connection when an exception is raised.
+ cause.printStackTrace();
+ ctx.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
new file mode 100644
index 0000000..2612c05
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test.netty;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.transports.NettyTcpTransport;
+import org.apache.qpid.jms.transports.TcpTransportOptions;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ * Test basic functionality of the Netty based TCP transport.
+ */
+public class NettyTcpTransportTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
+
+ private boolean transportClosed;
+ private final List<Throwable> exceptions = new ArrayList<Throwable>();
+ private final List<Buffer> data = new ArrayList<Buffer>();
+
+ private final TransportListener testListener = new NettyTransportListener();
+ private final TcpTransportOptions testOptions = new TcpTransportOptions();
+
+ @Test(timeout = 60 * 1000)
+ public void testConnectToServer() throws Exception {
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDetectServerClose() throws Exception {
+ NettyTcpTransport transport = null;
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ server.close();
+ }
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return transportClosed;
+ }
+ }));
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ assertFalse(transport.isConnected());
+
+ try {
+ transport.close();
+ } catch (Exception ex) {
+ fail("Close of a disconnect transport should not generate errors");
+ }
+ }
+
+ @Test(timeout = 60 * 1000)
+ public void testDataSentIsReceived() throws Exception {
+ final int SEND_BYTE_COUNT = 1024;
+
+ try (NettyEchoServer server = new NettyEchoServer()) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+ try {
+ transport.connect();
+ LOG.info("Connected to test server.");
+ } catch (Exception e) {
+ fail("Should have connected to the server");
+ }
+
+ assertTrue(transport.isConnected());
+
+ ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
+ for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
+ sendBuffer.writeByte('A');
+ }
+
+ transport.send(sendBuffer);
+
+ assertTrue(Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !data.isEmpty();
+ }
+ }));
+
+ assertEquals(SEND_BYTE_COUNT, data.get(0).length());
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ }
+
+ private class NettyTransportListener implements TransportListener {
+
+ @Override
+ public void onData(Buffer incoming) {
+ LOG.info("Client has new incoming data of size: {}", incoming.length());
+ data.add(incoming);
+ }
+
+ @Override
+ public void onTransportClosed() {
+ LOG.info("Transport reports that it has closed.");
+ transportClosed = true;
+ }
+
+ @Override
+ public void onTransportError(Throwable cause) {
+ LOG.info("Transport error caught: {}", cause.getMessage());
+ exceptions.add(cause);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org