You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/07 19:50:51 UTC

qpid-jms git commit: Adds some basic work on a replacement transport which uses Netty directly instead of Vert.x also adds some tests

Repository: qpid-jms
Updated Branches:
  refs/heads/master 8e3f1bd50 -> b95ac58df


Adds some basic work on a replacement transport which uses Netty
directly instead of Vert.x also adds some tests

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b95ac58d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b95ac58d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b95ac58d

Branch: refs/heads/master
Commit: b95ac58df115c4ba0ce57f2e9585de5571756845
Parents: 8e3f1bd
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jan 7 13:50:36 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jan 7 13:50:36 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/transports/NettyTcpTransport.java  | 204 +++++++++++++++++++
 .../jms/transports/TcpTransportOptions.java     | 153 ++++++++++++++
 .../java/org/apache/qpid/jms/test/Wait.java     |  48 +++++
 .../qpid/jms/test/netty/NettyEchoServer.java    | 132 ++++++++++++
 .../jms/test/netty/NettyTcpTransportTest.java   | 185 +++++++++++++++++
 5 files changed, 722 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
new file mode 100644
index 0000000..ebd385d
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/NettyTcpTransport.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.ReferenceCountUtil;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+import org.vertx.java.core.net.impl.PartialPooledByteBufAllocator;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements Transport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+    private Bootstrap bootstrap;
+    private EventLoopGroup group;
+    private Channel channel;
+    private TransportListener listener;
+    private final TcpTransportOptions options;
+    private final URI remote;
+
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+
+    /**
+     * Create a new transport instance
+     *
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyTcpTransport(TransportListener listener, URI remoteLocation, TcpTransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remote = remoteLocation;
+    }
+
+    @Override
+    public void connect() throws IOException {
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        group = new NioEventLoopGroup();
+
+        bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<Channel>() {
+
+            @Override
+            public void initChannel(Channel connectedChannel) throws Exception {
+                channel = connectedChannel;
+                channel.pipeline().addLast(new NettyTcpTransportHandler());
+            }
+        });
+
+        configureNetty(bootstrap, options);
+
+        ChannelFuture future = bootstrap.connect(remote.getHost(), remote.getPort());
+        future.awaitUninterruptibly();
+
+        if (future.isCancelled()) {
+            throw new IOException("Connection attempt was cancelled");
+        } else if (!future.isSuccess()) {
+            throw IOExceptionSupport.create(future.cause());
+        } else {
+            connected.set(true);
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected.get();
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            channel.close();
+            group.shutdownGracefully();
+        }
+    }
+
+    @Override
+    public void send(ByteBuffer output) throws IOException {
+        send(Unpooled.wrappedBuffer(output));
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        channel.write(output);
+        channel.flush();
+    }
+
+    @Override
+    public TransportListener getTransportListener() {
+        return listener;
+    }
+
+    @Override
+    public void setTransportListener(TransportListener listener) {
+        this.listener = listener;
+    }
+
+    //----- Internal implementation details ----------------------------------//
+
+    protected void configureNetty(Bootstrap bootstrap, TcpTransportOptions options) {
+        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+        if (options.getSendBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+        }
+
+        if (options.getReceiveBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, options.getSendBufferSize());
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getSendBufferSize()));
+        }
+
+        if (options.getTrafficClass() != -1) {
+            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+        }
+    }
+
+    //----- Handle connection events -----------------------------------------//
+
+    private class NettyTcpTransportHandler extends ChannelInboundHandlerAdapter {
+
+        @Override
+        public void channelActive(ChannelHandlerContext context) throws Exception {
+            LOG.info("Channel has become active! Channel is {}", context.channel());
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext context, Object inbound) throws Exception {
+            ByteBuf buffer = (ByteBuf) inbound;
+            LOG.info("New data read: {} bytes incoming", buffer.readableBytes());
+            try {
+                listener.onData(new Buffer(buffer));
+            } finally {
+                ReferenceCountUtil.release(inbound);
+            }
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext context) throws Exception {
+            LOG.info("Channel has gone inactive! Channel is {}", context.channel());
+            if (!closed.get()) {
+                connected.set(false);
+                listener.onTransportClosed();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+            LOG.info("Exception on channel! Channel is {}", context.channel());
+            if (!closed.get()) {
+                connected.set(false);
+                listener.onTransportError(cause);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
new file mode 100644
index 0000000..e5f90c3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TcpTransportOptions.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.transports;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class TcpTransportOptions {
+
+    public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+    public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+    public static final int DEFAULT_TRAFFIC_CLASS = 0;
+    public static final boolean DEFAULT_TCP_NO_DELAY = true;
+    public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+    public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+    public static final int DEFAULT_SO_TIMEOUT = -1;
+    public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+
+    private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+    private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+    private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+    private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+    private int soTimeout = DEFAULT_SO_TIMEOUT;
+    private int soLinger = DEFAULT_SO_LINGER;
+    private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+    private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+
+    /**
+     * @return the currently set send buffer size in bytes.
+     */
+    public int getSendBufferSize() {
+        return sendBufferSize;
+    }
+
+    /**
+     * Sets the send buffer size in bytes, the value must be greater than zero
+     * or an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param sendBufferSize
+     *        the new send buffer size for the TCP Transport.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setSendBufferSize(int sendBufferSize) {
+        if (sendBufferSize <= 0) {
+            throw new IllegalArgumentException("The send buffer size must be > 0");
+        }
+
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    /**
+     * @return the currently configured receive buffer size in bytes.
+     */
+    public int getReceiveBufferSize() {
+        return receiveBufferSize;
+    }
+
+    /**
+     * Sets the receive buffer size in bytes, the value must be greater than zero
+     * or an {@link IllegalArgumentException} will be thrown.
+     *
+     * @param receiveBufferSize
+     *        the new receive buffer size for the TCP Transport.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setReceiveBufferSize(int receiveBufferSize) {
+        if (receiveBufferSize <= 0) {
+            throw new IllegalArgumentException("The send buffer size must be > 0");
+        }
+
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    /**
+     * @return the currently configured traffic class value.
+     */
+    public int getTrafficClass() {
+        return trafficClass;
+    }
+
+    /**
+     * Sets the traffic class value used by the TCP connection, valid
+     * range is between 0 and 255.
+     *
+     * @param trafficClass
+     *        the new traffic class value.
+     *
+     * @throws IllegalArgumentException if the value given is not in the valid range.
+     */
+    public void setTrafficClass(int trafficClass) {
+        if (trafficClass < 0 || trafficClass > 255) {
+            throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+        }
+
+        this.trafficClass = trafficClass;
+    }
+
+    public int getSoTimeout() {
+        return soTimeout;
+    }
+
+    public void setSoTimeout(int soTimeout) {
+        this.soTimeout = soTimeout;
+    }
+
+    public boolean isTcpNoDelay() {
+        return tcpNoDelay;
+    }
+
+    public void setTcpNoDelay(boolean tcpNoDelay) {
+        this.tcpNoDelay = tcpNoDelay;
+    }
+
+    public int getSoLinger() {
+        return soLinger;
+    }
+
+    public void setSoLinger(int soLinger) {
+        this.soLinger = soLinger;
+    }
+
+    public boolean isTcpKeepAlive() {
+        return tcpKeepAlive;
+    }
+
+    public void setTcpKeepAlive(boolean keepAlive) {
+        this.tcpKeepAlive = keepAlive;
+    }
+
+    public int getConnectTimeout() {
+        return connectTimeout;
+    }
+
+    public void setConnectTimeout(int connectTimeout) {
+        this.connectTimeout = connectTimeout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
new file mode 100644
index 0000000..e95a2a9
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/Wait.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test;
+
+import java.util.concurrent.TimeUnit;
+
+public class Wait {
+
+    public static final long MAX_WAIT_MILLIS = 30 * 1000;
+    public static final int SLEEP_MILLIS = 1000;
+
+    public interface Condition {
+        boolean isSatisified() throws Exception;
+    }
+
+    public static boolean waitFor(Condition condition) throws Exception {
+        return waitFor(condition, MAX_WAIT_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration) throws Exception {
+        return waitFor(condition, duration, SLEEP_MILLIS);
+    }
+
+    public static boolean waitFor(final Condition condition, final long duration, final int sleepMillis) throws Exception {
+
+        final long expiry = System.currentTimeMillis() + duration;
+        boolean conditionSatisified = condition.isSatisified();
+        while (!conditionSatisified && System.currentTimeMillis() < expiry) {
+            TimeUnit.MILLISECONDS.sleep(sleepMillis);
+            conditionSatisified = condition.isSatisified();
+        }
+        return conditionSatisified;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
new file mode 100644
index 0000000..a3f5bcd
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyEchoServer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.logging.LogLevel;
+import io.netty.handler.logging.LoggingHandler;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.net.ServerSocketFactory;
+
+/**
+ * Simple Netty Server used to echo all data.
+ */
+public class NettyEchoServer implements AutoCloseable {
+
+    static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
+
+    private EventLoopGroup bossGroup;
+    private EventLoopGroup workerGroup;
+    private Channel serverChannel;
+    private int serverPort;
+
+    private final AtomicBoolean started = new AtomicBoolean();
+
+    public void start() throws Exception {
+
+        if (started.compareAndSet(false, true)) {
+
+            // Configure the server.
+            bossGroup = new NioEventLoopGroup(1);
+            workerGroup = new NioEventLoopGroup();
+
+            ServerBootstrap server = new ServerBootstrap();
+            server.group(bossGroup, workerGroup);
+            server.channel(NioServerSocketChannel.class);
+            server.option(ChannelOption.SO_BACKLOG, 100);
+            server.handler(new LoggingHandler(LogLevel.INFO));
+            server.childHandler(new ChannelInitializer<Channel>() {
+                @Override
+                public void initChannel(Channel ch) throws Exception {
+                    ch.pipeline().addLast(new EchoServerHandler());
+                }
+            });
+
+            // Start the server.
+            serverChannel = server.bind(getServerPort()).sync().channel();
+        }
+    }
+
+    public void stop() {
+        if (started.compareAndSet(true, false)) {
+            try {
+                serverChannel.close().sync();
+            } catch (InterruptedException e) {
+            }
+            // Shut down all event loops to terminate all threads.
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        }
+    }
+
+    @Override
+    public void close() {
+        stop();
+    }
+
+    public int getServerPort() {
+        if (serverPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                serverPort = ss.getLocalPort();
+            } catch (IOException e) { // revert back to default
+                serverPort = PORT;
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+        return serverPort;
+    }
+
+    private class EchoServerHandler extends ChannelInboundHandlerAdapter {
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            ctx.write(msg);
+        }
+
+        @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) {
+            ctx.flush();
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+            // Close the connection when an exception is raised.
+            cause.printStackTrace();
+            ctx.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b95ac58d/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
new file mode 100644
index 0000000..2612c05
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/netty/NettyTcpTransportTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.test.netty;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.transports.NettyTcpTransport;
+import org.apache.qpid.jms.transports.TcpTransportOptions;
+import org.apache.qpid.jms.transports.TransportListener;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.vertx.java.core.buffer.Buffer;
+
+/**
+ * Test basic functionality of the Netty based TCP transport.
+ */
+public class NettyTcpTransportTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
+
+    private boolean transportClosed;
+    private final List<Throwable> exceptions = new ArrayList<Throwable>();
+    private final List<Buffer> data = new ArrayList<Buffer>();
+
+    private final TransportListener testListener = new NettyTransportListener();
+    private final TcpTransportOptions testOptions = new TcpTransportOptions();
+
+    @Test(timeout = 60 * 1000)
+    public void testConnectToServer() throws Exception {
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDetectServerClose() throws Exception {
+        NettyTcpTransport transport = null;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            server.close();
+        }
+
+        assertTrue(Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return transportClosed;
+            }
+        }));
+        assertTrue(exceptions.isEmpty());
+        assertTrue(data.isEmpty());
+        assertFalse(transport.isConnected());
+
+        try {
+            transport.close();
+        } catch (Exception ex) {
+            fail("Close of a disconnect transport should not generate errors");
+        }
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testDataSentIsReceived() throws Exception {
+        final int SEND_BYTE_COUNT = 1024;
+
+        try (NettyEchoServer server = new NettyEchoServer()) {
+            server.start();
+
+            int port = server.getServerPort();
+            URI serverLocation = new URI("tcp://localhost:" + port);
+
+            NettyTcpTransport transport = new NettyTcpTransport(testListener, serverLocation, testOptions);
+            try {
+                transport.connect();
+                LOG.info("Connected to test server.");
+            } catch (Exception e) {
+                fail("Should have connected to the server");
+            }
+
+            assertTrue(transport.isConnected());
+
+            ByteBuf sendBuffer = Unpooled.buffer(SEND_BYTE_COUNT);
+            for (int i = 0; i < SEND_BYTE_COUNT; ++i) {
+                sendBuffer.writeByte('A');
+            }
+
+            transport.send(sendBuffer);
+
+            assertTrue(Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return !data.isEmpty();
+                }
+            }));
+
+            assertEquals(SEND_BYTE_COUNT, data.get(0).length());
+
+            transport.close();
+        }
+
+        assertTrue(!transportClosed);  // Normal shutdown does not trigger the event.
+        assertTrue(exceptions.isEmpty());
+    }
+
+    private class NettyTransportListener implements TransportListener {
+
+        @Override
+        public void onData(Buffer incoming) {
+            LOG.info("Client has new incoming data of size: {}", incoming.length());
+            data.add(incoming);
+        }
+
+        @Override
+        public void onTransportClosed() {
+            LOG.info("Transport reports that it has closed.");
+            transportClosed = true;
+        }
+
+        @Override
+        public void onTransportError(Throwable cause) {
+            LOG.info("Transport error caught: {}", cause.getMessage());
+            exceptions.add(cause);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org