You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/13 22:02:43 UTC

[4/8] activemq-artemis git commit: ARTEMIS-1511 Refactor AMQP Transport for use with other test clients

ARTEMIS-1511 Refactor AMQP Transport for use with other test clients


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5211afdf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5211afdf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5211afdf

Branch: refs/heads/master
Commit: 5211afdf866fbcb5b538b2d5e2670dd5df385423
Parents: 63b156e
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Nov 10 12:31:29 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 13 16:55:47 2017 -0500

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpClient.java       |   4 +-
 .../transport/amqp/client/AmqpConnection.java   |   7 +-
 .../client/transport/NettyTcpTransport.java     | 460 -------------------
 .../amqp/client/transport/NettyTransport.java   |  56 ---
 .../client/transport/NettyTransportFactory.java |  83 ----
 .../transport/NettyTransportListener.java       |  48 --
 .../client/transport/NettyTransportOptions.java | 208 ---------
 .../transport/NettyTransportSslOptions.java     | 302 ------------
 .../client/transport/NettyTransportSupport.java | 304 ------------
 .../amqp/client/transport/NettyWSTransport.java | 171 -------
 .../client/transport/X509AliasKeyManager.java   |  86 ----
 .../transport/netty/NettyTcpTransport.java      | 460 +++++++++++++++++++
 .../transport/netty/NettyTransport.java         |  57 +++
 .../transport/netty/NettyTransportFactory.java  |  82 ++++
 .../transport/netty/NettyTransportListener.java |  48 ++
 .../transport/netty/NettyTransportOptions.java  | 219 +++++++++
 .../netty/NettyTransportSslOptions.java         | 302 ++++++++++++
 .../transport/netty/NettyTransportSupport.java  | 304 ++++++++++++
 .../transport/netty/NettyWSTransport.java       | 172 +++++++
 .../transport/netty/X509AliasKeyManager.java    |  86 ++++
 .../impl/netty/NettyHandshakeTimeoutTest.java   |   6 +-
 21 files changed, 1739 insertions(+), 1726 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index fddaf9d..d35d0ab 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -21,8 +21,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.activemq.transport.netty.NettyTransport;
+import org.apache.activemq.transport.netty.NettyTransportFactory;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 2fc720a..01e2288 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -33,8 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.netty.NettyTransport;
 import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
+import org.apache.activemq.transport.netty.NettyTransportListener;
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IdGenerator;
@@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private final AtomicLong sessionIdGenerator = new AtomicLong();
    private final AtomicLong txIdGenerator = new AtomicLong();
    private final Collector protonCollector = new CollectorImpl();
-   private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
+   private final NettyTransport transport;
    private final Transport protonTransport = Transport.Factory.create();
 
    private final String username;
@@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    private boolean trace;
    private boolean noContainerID = false;
 
-   public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
+   public AmqpConnection(NettyTransport transport, String username, String password) {
       setEndpoint(Connection.Factory.create());
       getEndpoint().collect(protonCollector);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
deleted file mode 100644
index 7ce3bb9..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.security.Principal;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-/**
- * TCP based transport that uses Netty as the underlying IO layer.
- */
-public class NettyTcpTransport implements NettyTransport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
-
-   private static final int SHUTDOWN_TIMEOUT = 100;
-   public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
-
-   protected Bootstrap bootstrap;
-   protected EventLoopGroup group;
-   protected Channel channel;
-   protected NettyTransportListener listener;
-   protected final NettyTransportOptions options;
-   protected final URI remote;
-   protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
-
-   private final AtomicBoolean connected = new AtomicBoolean();
-   private final AtomicBoolean closed = new AtomicBoolean();
-   private final CountDownLatch connectLatch = new CountDownLatch(1);
-   private volatile IOException failureCause;
-
-   /**
-    * Create a new transport instance
-    *
-    * @param remoteLocation
-    *        the URI that defines the remote resource to connect to.
-    * @param options
-    *        the transport options used to configure the socket connection.
-    */
-   public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
-      this(null, remoteLocation, options);
-   }
-
-   /**
-    * Create a new transport instance
-    *
-    * @param listener
-    *        the TransportListener that will receive events from this Transport.
-    * @param remoteLocation
-    *        the URI that defines the remote resource to connect to.
-    * @param options
-    *        the transport options used to configure the socket connection.
-    */
-   public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
-      if (options == null) {
-         throw new IllegalArgumentException("Transport Options cannot be null");
-      }
-
-      if (remoteLocation == null) {
-         throw new IllegalArgumentException("Transport remote location cannot be null");
-      }
-
-      this.options = options;
-      this.listener = listener;
-      this.remote = remoteLocation;
-   }
-
-   @Override
-   public void connect() throws IOException {
-
-      if (listener == null) {
-         throw new IllegalStateException("A transport listener must be set before connection attempts.");
-      }
-
-      final SslHandler sslHandler;
-      if (isSSL()) {
-         try {
-            sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-         } catch (Exception ex) {
-            // TODO: can we stop it throwing Exception?
-            throw IOExceptionSupport.create(ex);
-         }
-      } else {
-         sslHandler = null;
-      }
-
-      group = new NioEventLoopGroup(1);
-
-      bootstrap = new Bootstrap();
-      bootstrap.group(group);
-      bootstrap.channel(NioSocketChannel.class);
-      bootstrap.handler(new ChannelInitializer<Channel>() {
-         @Override
-         public void initChannel(Channel connectedChannel) throws Exception {
-            configureChannel(connectedChannel, sslHandler);
-         }
-      });
-
-      configureNetty(bootstrap, getTransportOptions());
-
-      ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
-      future.addListener(new ChannelFutureListener() {
-
-         @Override
-         public void operationComplete(ChannelFuture future) throws Exception {
-            if (!future.isSuccess()) {
-               handleException(future.channel(), IOExceptionSupport.create(future.cause()));
-            }
-         }
-      });
-
-      try {
-         connectLatch.await();
-      } catch (InterruptedException ex) {
-         LOG.debug("Transport connection was interrupted.");
-         Thread.interrupted();
-         failureCause = IOExceptionSupport.create(ex);
-      }
-
-      if (failureCause != null) {
-         // Close out any Netty resources now as they are no longer needed.
-         if (channel != null) {
-            channel.close().syncUninterruptibly();
-            channel = null;
-         }
-         if (group != null) {
-            Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
-               LOG.trace("Channel group shutdown failed to complete in allotted time");
-            }
-            group = null;
-         }
-
-         throw failureCause;
-      } else {
-         // Connected, allow any held async error to fire now and close the transport.
-         channel.eventLoop().execute(new Runnable() {
-
-            @Override
-            public void run() {
-               if (failureCause != null) {
-                  channel.pipeline().fireExceptionCaught(failureCause);
-               }
-            }
-         });
-      }
-   }
-
-   @Override
-   public boolean isConnected() {
-      return connected.get();
-   }
-
-   @Override
-   public boolean isSSL() {
-      return options.isSSL();
-   }
-
-   @Override
-   public void close() throws IOException {
-      if (closed.compareAndSet(false, true)) {
-         connected.set(false);
-         try {
-            if (channel != null) {
-               channel.close().syncUninterruptibly();
-            }
-         } finally {
-            if (group != null) {
-               Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-               if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
-                  LOG.trace("Channel group shutdown failed to complete in allotted time");
-               }
-            }
-         }
-      }
-   }
-
-   @Override
-   public ByteBuf allocateSendBuffer(int size) throws IOException {
-      checkConnected();
-      return channel.alloc().ioBuffer(size, size);
-   }
-
-   @Override
-   public void send(ByteBuf output) throws IOException {
-      checkConnected();
-      int length = output.readableBytes();
-      if (length == 0) {
-         return;
-      }
-
-      LOG.trace("Attempted write of: {} bytes", length);
-
-      channel.writeAndFlush(output);
-   }
-
-   @Override
-   public NettyTransportListener getTransportListener() {
-      return listener;
-   }
-
-   @Override
-   public void setTransportListener(NettyTransportListener listener) {
-      this.listener = listener;
-   }
-
-   @Override
-   public NettyTransportOptions getTransportOptions() {
-      return options;
-   }
-
-   @Override
-   public URI getRemoteLocation() {
-      return remote;
-   }
-
-   @Override
-   public Principal getLocalPrincipal() {
-      Principal result = null;
-
-      if (isSSL()) {
-         SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
-         result = sslHandler.engine().getSession().getLocalPrincipal();
-      }
-
-      return result;
-   }
-
-   @Override
-   public void setMaxFrameSize(int maxFrameSize) {
-      if (connected.get()) {
-         throw new IllegalStateException("Cannot change Max Frame Size while connected.");
-      }
-
-      this.maxFrameSize = maxFrameSize;
-   }
-
-   @Override
-   public int getMaxFrameSize() {
-      return maxFrameSize;
-   }
-
-   // ----- Internal implementation details, can be overridden as needed -----//
-
-   protected String getRemoteHost() {
-      return remote.getHost();
-   }
-
-   protected int getRemotePort() {
-      if (remote.getPort() != -1) {
-         return remote.getPort();
-      } else {
-         return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
-      }
-   }
-
-   protected void addAdditionalHandlers(ChannelPipeline pipeline) {
-
-   }
-
-   protected ChannelInboundHandlerAdapter createChannelHandler() {
-      return new NettyTcpTransportHandler();
-   }
-
-   // ----- Event Handlers which can be overridden in subclasses -------------//
-
-   protected void handleConnected(Channel channel) throws Exception {
-      LOG.trace("Channel has become active! Channel is {}", channel);
-      connectionEstablished(channel);
-   }
-
-   protected void handleChannelInactive(Channel channel) throws Exception {
-      LOG.trace("Channel has gone inactive! Channel is {}", channel);
-      if (connected.compareAndSet(true, false) && !closed.get()) {
-         LOG.trace("Firing onTransportClosed listener");
-         listener.onTransportClosed();
-      }
-   }
-
-   protected void handleException(Channel channel, Throwable cause) throws Exception {
-      LOG.trace("Exception on channel! Channel is {}", channel);
-      if (connected.compareAndSet(true, false) && !closed.get()) {
-         LOG.trace("Firing onTransportError listener");
-         if (failureCause != null) {
-            listener.onTransportError(failureCause);
-         } else {
-            listener.onTransportError(cause);
-         }
-      } else {
-         // Hold the first failure for later dispatch if connect succeeds.
-         // This will then trigger disconnect using the first error reported.
-         if (failureCause == null) {
-            LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-            failureCause = IOExceptionSupport.create(cause);
-         }
-
-         connectionFailed(channel, failureCause);
-      }
-   }
-
-   // ----- State change handlers and checks ---------------------------------//
-
-   protected final void checkConnected() throws IOException {
-      if (!connected.get()) {
-         throw new IOException("Cannot send to a non-connected transport.");
-      }
-   }
-
-   /*
-    * Called when the transport has successfully connected and is ready for use.
-    */
-   private void connectionEstablished(Channel connectedChannel) {
-      channel = connectedChannel;
-      connected.set(true);
-      connectLatch.countDown();
-   }
-
-   /*
-    * Called when the transport connection failed and an error should be returned.
-    */
-   private void connectionFailed(Channel failedChannel, IOException cause) {
-      failureCause = cause;
-      channel = failedChannel;
-      connected.set(false);
-      connectLatch.countDown();
-   }
-
-   private NettyTransportSslOptions getSslOptions() {
-      return (NettyTransportSslOptions) getTransportOptions();
-   }
-
-   private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-
-      if (options.getSendBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-      }
-
-      if (options.getReceiveBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-      }
-
-      if (options.getTrafficClass() != -1) {
-         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
-      }
-   }
-
-   private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
-      if (isSSL()) {
-         channel.pipeline().addLast(sslHandler);
-      }
-
-      if (getTransportOptions().isTraceBytes()) {
-         channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
-      }
-
-      addAdditionalHandlers(channel.pipeline());
-
-      channel.pipeline().addLast(createChannelHandler());
-   }
-
-   // ----- Handle connection events -----------------------------------------//
-
-   protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
-
-      @Override
-      public void channelRegistered(ChannelHandlerContext context) throws Exception {
-         channel = context.channel();
-      }
-
-      @Override
-      public void channelActive(ChannelHandlerContext context) throws Exception {
-         // In the Secure case we need to let the handshake complete before we
-         // trigger the connected event.
-         if (!isSSL()) {
-            handleConnected(context.channel());
-         } else {
-            SslHandler sslHandler = context.pipeline().get(SslHandler.class);
-            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-               @Override
-               public void operationComplete(Future<Channel> future) throws Exception {
-                  if (future.isSuccess()) {
-                     LOG.trace("SSL Handshake has completed: {}", channel);
-                     handleConnected(channel);
-                  } else {
-                     LOG.trace("SSL Handshake has failed: {}", channel);
-                     handleException(channel, future.cause());
-                  }
-               }
-            });
-         }
-      }
-
-      @Override
-      public void channelInactive(ChannelHandlerContext context) throws Exception {
-         handleChannelInactive(context.channel());
-      }
-
-      @Override
-      public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-         handleException(context.channel(), cause);
-      }
-   }
-
-   // ----- Handle Binary data from connection -------------------------------//
-
-   protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
-
-      @Override
-      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
-         LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
-         listener.onData(buffer);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
deleted file mode 100644
index 4d5a389..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.security.Principal;
-
-import io.netty.buffer.ByteBuf;
-
-/**
- * Base for all Netty based Transports in this client.
- */
-public interface NettyTransport {
-
-   void connect() throws IOException;
-
-   boolean isConnected();
-
-   boolean isSSL();
-
-   void close() throws IOException;
-
-   ByteBuf allocateSendBuffer(int size) throws IOException;
-
-   void send(ByteBuf output) throws IOException;
-
-   NettyTransportListener getTransportListener();
-
-   void setTransportListener(NettyTransportListener listener);
-
-   NettyTransportOptions getTransportOptions();
-
-   URI getRemoteLocation();
-
-   Principal getLocalPrincipal();
-
-   void setMaxFrameSize(int maxFrameSize);
-
-   int getMaxFrameSize();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
deleted file mode 100644
index 30b2e21..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
-
-/**
- * Factory for creating the Netty based TCP Transport.
- */
-public final class NettyTransportFactory {
-
-   private NettyTransportFactory() {
-   }
-
-   /**
-    * Creates an instance of the given Transport and configures it using the properties set on
-    * the given remote broker URI.
-    *
-    * @param remoteURI
-    *        The URI used to connect to a remote Peer.
-    *
-    * @return a new Transport instance.
-    *
-    * @throws Exception
-    *         if an error occurs while creating the Transport instance.
-    */
-   public static NettyTransport createTransport(URI remoteURI) throws Exception {
-      Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
-      Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
-      NettyTransportOptions transportOptions = null;
-
-      remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
-
-      if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
-         transportOptions = NettyTransportOptions.INSTANCE.clone();
-      } else {
-         transportOptions = NettyTransportSslOptions.INSTANCE.clone();
-      }
-
-      Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions);
-      if (!unused.isEmpty()) {
-         String msg = " Not all transport options could be set on the TCP based" +
-            " Transport. Check the options are spelled correctly." +
-            " Unused parameters=[" + unused + "]." +
-            " This provider instance cannot be started.";
-         throw new IllegalArgumentException(msg);
-      }
-
-      NettyTransport result = null;
-
-      switch (remoteURI.getScheme().toLowerCase()) {
-         case "tcp":
-         case "ssl":
-            result = new NettyTcpTransport(remoteURI, transportOptions);
-            break;
-         case "ws":
-         case "wss":
-            result = new NettyWSTransport(remoteURI, transportOptions);
-            break;
-         default:
-            throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
-      }
-
-      return result;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
deleted file mode 100644
index 0163517..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import io.netty.buffer.ByteBuf;
-
-/**
- * Listener interface that should be implemented by users of the various QpidJMS Transport
- * classes.
- */
-public interface NettyTransportListener {
-
-   /**
-    * Called when new incoming data has become available.
-    *
-    * @param incoming
-    *        the next incoming packet of data.
-    */
-   void onData(ByteBuf incoming);
-
-   /**
-    * Called if the connection state becomes closed.
-    */
-   void onTransportClosed();
-
-   /**
-    * Called when an error occurs during normal Transport operations.
-    *
-    * @param cause
-    *        the error that triggered this event.
-    */
-   void onTransportError(Throwable cause);
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
deleted file mode 100644
index c5022c1..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-/**
- * Encapsulates all the TCP Transport options in one configuration object.
- */
-public class NettyTransportOptions implements Cloneable {
-
-   public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
-   public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
-   public static final int DEFAULT_TRAFFIC_CLASS = 0;
-   public static final boolean DEFAULT_TCP_NO_DELAY = true;
-   public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
-   public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
-   public static final int DEFAULT_SO_TIMEOUT = -1;
-   public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
-   public static final int DEFAULT_TCP_PORT = 5672;
-   public static final boolean DEFAULT_TRACE_BYTES = false;
-
-   public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
-
-   private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
-   private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
-   private int trafficClass = DEFAULT_TRAFFIC_CLASS;
-   private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
-   private int soTimeout = DEFAULT_SO_TIMEOUT;
-   private int soLinger = DEFAULT_SO_LINGER;
-   private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
-   private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
-   private int defaultTcpPort = DEFAULT_TCP_PORT;
-   private boolean traceBytes = DEFAULT_TRACE_BYTES;
-
-   /**
-    * @return the currently set send buffer size in bytes.
-    */
-   public int getSendBufferSize() {
-      return sendBufferSize;
-   }
-
-   /**
-    * Sets the send buffer size in bytes, the value must be greater than zero or an
-    * {@link IllegalArgumentException} will be thrown.
-    *
-    * @param sendBufferSize
-    *        the new send buffer size for the TCP Transport.
-    *
-    * @throws IllegalArgumentException
-    *         if the value given is not in the valid range.
-    */
-   public void setSendBufferSize(int sendBufferSize) {
-      if (sendBufferSize <= 0) {
-         throw new IllegalArgumentException("The send buffer size must be > 0");
-      }
-
-      this.sendBufferSize = sendBufferSize;
-   }
-
-   /**
-    * @return the currently configured receive buffer size in bytes.
-    */
-   public int getReceiveBufferSize() {
-      return receiveBufferSize;
-   }
-
-   /**
-    * Sets the receive buffer size in bytes, the value must be greater than zero or an
-    * {@link IllegalArgumentException} will be thrown.
-    *
-    * @param receiveBufferSize
-    *        the new receive buffer size for the TCP Transport.
-    *
-    * @throws IllegalArgumentException
-    *         if the value given is not in the valid range.
-    */
-   public void setReceiveBufferSize(int receiveBufferSize) {
-      if (receiveBufferSize <= 0) {
-         throw new IllegalArgumentException("The send buffer size must be > 0");
-      }
-
-      this.receiveBufferSize = receiveBufferSize;
-   }
-
-   /**
-    * @return the currently configured traffic class value.
-    */
-   public int getTrafficClass() {
-      return trafficClass;
-   }
-
-   /**
-    * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255.
-    *
-    * @param trafficClass
-    *        the new traffic class value.
-    *
-    * @throws IllegalArgumentException
-    *         if the value given is not in the valid range.
-    */
-   public void setTrafficClass(int trafficClass) {
-      if (trafficClass < 0 || trafficClass > 255) {
-         throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
-      }
-
-      this.trafficClass = trafficClass;
-   }
-
-   public int getSoTimeout() {
-      return soTimeout;
-   }
-
-   public void setSoTimeout(int soTimeout) {
-      this.soTimeout = soTimeout;
-   }
-
-   public boolean isTcpNoDelay() {
-      return tcpNoDelay;
-   }
-
-   public void setTcpNoDelay(boolean tcpNoDelay) {
-      this.tcpNoDelay = tcpNoDelay;
-   }
-
-   public int getSoLinger() {
-      return soLinger;
-   }
-
-   public void setSoLinger(int soLinger) {
-      this.soLinger = soLinger;
-   }
-
-   public boolean isTcpKeepAlive() {
-      return tcpKeepAlive;
-   }
-
-   public void setTcpKeepAlive(boolean keepAlive) {
-      this.tcpKeepAlive = keepAlive;
-   }
-
-   public int getConnectTimeout() {
-      return connectTimeout;
-   }
-
-   public void setConnectTimeout(int connectTimeout) {
-      this.connectTimeout = connectTimeout;
-   }
-
-   public int getDefaultTcpPort() {
-      return defaultTcpPort;
-   }
-
-   public void setDefaultTcpPort(int defaultTcpPort) {
-      this.defaultTcpPort = defaultTcpPort;
-   }
-
-   /**
-    * @return true if the transport should enable byte tracing
-    */
-   public boolean isTraceBytes() {
-      return traceBytes;
-   }
-
-   /**
-    * Determines if the transport should add a logger for bytes in / out
-    *
-    * @param traceBytes
-    *        should the transport log the bytes in and out.
-    */
-   public void setTraceBytes(boolean traceBytes) {
-      this.traceBytes = traceBytes;
-   }
-
-   public boolean isSSL() {
-      return false;
-   }
-
-   @Override
-   public NettyTransportOptions clone() {
-      return copyOptions(new NettyTransportOptions());
-   }
-
-   protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
-      copy.setConnectTimeout(getConnectTimeout());
-      copy.setReceiveBufferSize(getReceiveBufferSize());
-      copy.setSendBufferSize(getSendBufferSize());
-      copy.setSoLinger(getSoLinger());
-      copy.setSoTimeout(getSoTimeout());
-      copy.setTcpKeepAlive(isTcpKeepAlive());
-      copy.setTcpNoDelay(isTcpNoDelay());
-      copy.setTrafficClass(getTrafficClass());
-
-      return copy;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
deleted file mode 100644
index 3289fce..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Holds the defined SSL options for connections that operate over a secure transport. Options
- * are read from the environment and can be overridden by specifying them on the connection URI.
- */
-public class NettyTransportSslOptions extends NettyTransportOptions {
-
-   public static final String DEFAULT_STORE_TYPE = "jks";
-   public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
-   public static final boolean DEFAULT_TRUST_ALL = false;
-   public static final boolean DEFAULT_VERIFY_HOST = false;
-   public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"}));
-   public static final int DEFAULT_SSL_PORT = 5671;
-
-   public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
-
-   private String keyStoreLocation;
-   private String keyStorePassword;
-   private String trustStoreLocation;
-   private String trustStorePassword;
-   private String storeType = DEFAULT_STORE_TYPE;
-   private String[] enabledCipherSuites;
-   private String[] disabledCipherSuites;
-   private String[] enabledProtocols;
-   private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]);
-   private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL;
-
-   private boolean trustAll = DEFAULT_TRUST_ALL;
-   private boolean verifyHost = DEFAULT_VERIFY_HOST;
-   private String keyAlias;
-   private int defaultSslPort = DEFAULT_SSL_PORT;
-
-   static {
-      INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
-      INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
-      INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
-      INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
-   }
-
-   /**
-    * @return the keyStoreLocation currently configured.
-    */
-   public String getKeyStoreLocation() {
-      return keyStoreLocation;
-   }
-
-   /**
-    * Sets the location on disk of the key store to use.
-    *
-    * @param keyStoreLocation
-    *        the keyStoreLocation to use to create the key manager.
-    */
-   public void setKeyStoreLocation(String keyStoreLocation) {
-      this.keyStoreLocation = keyStoreLocation;
-   }
-
-   /**
-    * @return the keyStorePassword
-    */
-   public String getKeyStorePassword() {
-      return keyStorePassword;
-   }
-
-   /**
-    * @param keyStorePassword
-    *        the keyStorePassword to set
-    */
-   public void setKeyStorePassword(String keyStorePassword) {
-      this.keyStorePassword = keyStorePassword;
-   }
-
-   /**
-    * @return the trustStoreLocation
-    */
-   public String getTrustStoreLocation() {
-      return trustStoreLocation;
-   }
-
-   /**
-    * @param trustStoreLocation
-    *        the trustStoreLocation to set
-    */
-   public void setTrustStoreLocation(String trustStoreLocation) {
-      this.trustStoreLocation = trustStoreLocation;
-   }
-
-   /**
-    * @return the trustStorePassword
-    */
-   public String getTrustStorePassword() {
-      return trustStorePassword;
-   }
-
-   /**
-    * @param trustStorePassword
-    *        the trustStorePassword to set
-    */
-   public void setTrustStorePassword(String trustStorePassword) {
-      this.trustStorePassword = trustStorePassword;
-   }
-
-   /**
-    * @return the storeType
-    */
-   public String getStoreType() {
-      return storeType;
-   }
-
-   /**
-    * @param storeType
-    *        the format that the store files are encoded in.
-    */
-   public void setStoreType(String storeType) {
-      this.storeType = storeType;
-   }
-
-   /**
-    * @return the enabledCipherSuites
-    */
-   public String[] getEnabledCipherSuites() {
-      return enabledCipherSuites;
-   }
-
-   /**
-    * @param enabledCipherSuites
-    *        the enabledCipherSuites to set
-    */
-   public void setEnabledCipherSuites(String[] enabledCipherSuites) {
-      this.enabledCipherSuites = enabledCipherSuites;
-   }
-
-   /**
-    * @return the disabledCipherSuites
-    */
-   public String[] getDisabledCipherSuites() {
-      return disabledCipherSuites;
-   }
-
-   /**
-    * @param disabledCipherSuites
-    *        the disabledCipherSuites to set
-    */
-   public void setDisabledCipherSuites(String[] disabledCipherSuites) {
-      this.disabledCipherSuites = disabledCipherSuites;
-   }
-
-   /**
-    * @return the enabledProtocols or null if the defaults should be used
-    */
-   public String[] getEnabledProtocols() {
-      return enabledProtocols;
-   }
-
-   /**
-    * The protocols to be set as enabled.
-    *
-    * @param enabledProtocols
-    *        the enabled protocols to set, or null if the defaults should be used.
-    */
-   public void setEnabledProtocols(String[] enabledProtocols) {
-      this.enabledProtocols = enabledProtocols;
-   }
-
-   /**
-    *
-    * @return the protocols to disable or null if none should be
-    */
-   public String[] getDisabledProtocols() {
-      return disabledProtocols;
-   }
-
-   /**
-    * The protocols to be disable.
-    *
-    * @param disabledProtocols
-    *        the protocols to disable, or null if none should be.
-    */
-   public void setDisabledProtocols(String[] disabledProtocols) {
-      this.disabledProtocols = disabledProtocols;
-   }
-
-   /**
-    * @return the context protocol to use
-    */
-   public String getContextProtocol() {
-      return contextProtocol;
-   }
-
-   /**
-    * The protocol value to use when creating an SSLContext via
-    * SSLContext.getInstance(protocol).
-    *
-    * @param contextProtocol
-    *        the context protocol to use.
-    */
-   public void setContextProtocol(String contextProtocol) {
-      this.contextProtocol = contextProtocol;
-   }
-
-   /**
-    * @return the trustAll
-    */
-   public boolean isTrustAll() {
-      return trustAll;
-   }
-
-   /**
-    * @param trustAll
-    *        the trustAll to set
-    */
-   public void setTrustAll(boolean trustAll) {
-      this.trustAll = trustAll;
-   }
-
-   /**
-    * @return the verifyHost
-    */
-   public boolean isVerifyHost() {
-      return verifyHost;
-   }
-
-   /**
-    * @param verifyHost
-    *        the verifyHost to set
-    */
-   public void setVerifyHost(boolean verifyHost) {
-      this.verifyHost = verifyHost;
-   }
-
-   /**
-    * @return the key alias
-    */
-   public String getKeyAlias() {
-      return keyAlias;
-   }
-
-   /**
-    * @param keyAlias
-    *        the key alias to use
-    */
-   public void setKeyAlias(String keyAlias) {
-      this.keyAlias = keyAlias;
-   }
-
-   public int getDefaultSslPort() {
-      return defaultSslPort;
-   }
-
-   public void setDefaultSslPort(int defaultSslPort) {
-      this.defaultSslPort = defaultSslPort;
-   }
-
-   @Override
-   public boolean isSSL() {
-      return true;
-   }
-
-   @Override
-   public NettyTransportSslOptions clone() {
-      return copyOptions(new NettyTransportSslOptions());
-   }
-
-   protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) {
-      super.copyOptions(copy);
-
-      copy.setKeyStoreLocation(getKeyStoreLocation());
-      copy.setKeyStorePassword(getKeyStorePassword());
-      copy.setTrustStoreLocation(getTrustStoreLocation());
-      copy.setTrustStorePassword(getTrustStorePassword());
-      copy.setStoreType(getStoreType());
-      copy.setEnabledCipherSuites(getEnabledCipherSuites());
-      copy.setDisabledCipherSuites(getDisabledCipherSuites());
-      copy.setEnabledProtocols(getEnabledProtocols());
-      copy.setDisabledProtocols(getDisabledProtocols());
-      copy.setTrustAll(isTrustAll());
-      copy.setVerifyHost(isVerifyHost());
-      copy.setKeyAlias(getKeyAlias());
-      copy.setContextProtocol(getContextProtocol());
-      return copy;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
deleted file mode 100644
index d41c669..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URI;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509ExtendedKeyManager;
-import javax.net.ssl.X509TrustManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.handler.ssl.SslHandler;
-
-/**
- * Static class that provides various utility methods used by Transport implementations.
- */
-public class NettyTransportSupport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
-
-   /**
-    * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder /
-    * decoder.
-    *
-    * @param remote
-    *        The URI of the remote peer that the SslHandler will be used against.
-    * @param options
-    *        The SSL options object to build the SslHandler instance from.
-    *
-    * @return a new SslHandler that is configured from the given options.
-    *
-    * @throws Exception
-    *         if an error occurs while creating the SslHandler instance.
-    */
-   public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
-      return new SslHandler(createSslEngine(remote, createSslContext(options), options));
-   }
-
-   /**
-    * Create a new SSLContext using the options specific in the given TransportSslOptions
-    * instance.
-    *
-    * @param options
-    *        the configured options used to create the SSLContext.
-    *
-    * @return a new SSLContext instance.
-    *
-    * @throws Exception
-    *         if an error occurs while creating the context.
-    */
-   public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
-      try {
-         String contextProtocol = options.getContextProtocol();
-         LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol);
-
-         SSLContext context = SSLContext.getInstance(contextProtocol);
-         KeyManager[] keyMgrs = loadKeyManagers(options);
-         TrustManager[] trustManagers = loadTrustManagers(options);
-
-         context.init(keyMgrs, trustManagers, new SecureRandom());
-         return context;
-      } catch (Exception e) {
-         LOG.error("Failed to create SSLContext: {}", e, e);
-         throw e;
-      }
-   }
-
-   /**
-    * Create a new SSLEngine instance in client mode from the given SSLContext and
-    * TransportSslOptions instances.
-    *
-    * @param context
-    *        the SSLContext to use when creating the engine.
-    * @param options
-    *        the TransportSslOptions to use to configure the new SSLEngine.
-    *
-    * @return a new SSLEngine instance in client mode.
-    *
-    * @throws Exception
-    *         if an error occurs while creating the new SSLEngine.
-    */
-   public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
-      return createSslEngine(null, context, options);
-   }
-
-   /**
-    * Create a new SSLEngine instance in client mode from the given SSLContext and
-    * TransportSslOptions instances.
-    *
-    * @param remote
-    *        the URI of the remote peer that will be used to initialize the engine, may be null
-    *        if none should.
-    * @param context
-    *        the SSLContext to use when creating the engine.
-    * @param options
-    *        the TransportSslOptions to use to configure the new SSLEngine.
-    *
-    * @return a new SSLEngine instance in client mode.
-    *
-    * @throws Exception
-    *         if an error occurs while creating the new SSLEngine.
-    */
-   public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
-      SSLEngine engine = null;
-      if (remote == null) {
-         engine = context.createSSLEngine();
-      } else {
-         engine = context.createSSLEngine(remote.getHost(), remote.getPort());
-      }
-
-      engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
-      engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
-      engine.setUseClientMode(true);
-
-      if (options.isVerifyHost()) {
-         SSLParameters sslParameters = engine.getSSLParameters();
-         sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
-         engine.setSSLParameters(sslParameters);
-      }
-
-      return engine;
-   }
-
-   private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) {
-      List<String> enabledProtocols = new ArrayList<>();
-
-      if (options.getEnabledProtocols() != null) {
-         List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols());
-         LOG.trace("Configured protocols from transport options: {}", configuredProtocols);
-         enabledProtocols.addAll(configuredProtocols);
-      } else {
-         List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
-         LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols);
-         enabledProtocols.addAll(engineProtocols);
-      }
-
-      String[] disabledProtocols = options.getDisabledProtocols();
-      if (disabledProtocols != null) {
-         List<String> disabled = Arrays.asList(disabledProtocols);
-         LOG.trace("Disabled protocols: {}", disabled);
-         enabledProtocols.removeAll(disabled);
-      }
-
-      LOG.trace("Enabled protocols: {}", enabledProtocols);
-
-      return enabledProtocols.toArray(new String[0]);
-   }
-
-   private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) {
-      List<String> enabledCipherSuites = new ArrayList<>();
-
-      if (options.getEnabledCipherSuites() != null) {
-         List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites());
-         LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites);
-         enabledCipherSuites.addAll(configuredCipherSuites);
-      } else {
-         List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
-         LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites);
-         enabledCipherSuites.addAll(engineCipherSuites);
-      }
-
-      String[] disabledCipherSuites = options.getDisabledCipherSuites();
-      if (disabledCipherSuites != null) {
-         List<String> disabled = Arrays.asList(disabledCipherSuites);
-         LOG.trace("Disabled cipher suites: {}", disabled);
-         enabledCipherSuites.removeAll(disabled);
-      }
-
-      LOG.trace("Enabled cipher suites: {}", enabledCipherSuites);
-
-      return enabledCipherSuites.toArray(new String[0]);
-   }
-
-   private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
-      if (options.isTrustAll()) {
-         return new TrustManager[] {createTrustAllTrustManager()};
-      }
-
-      if (options.getTrustStoreLocation() == null) {
-         return null;
-      }
-
-      TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-
-      String storeLocation = options.getTrustStoreLocation();
-      String storePassword = options.getTrustStorePassword();
-      String storeType = options.getStoreType();
-
-      LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType);
-
-      KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
-      fact.init(trustStore);
-
-      return fact.getTrustManagers();
-   }
-
-   private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception {
-      if (options.getKeyStoreLocation() == null) {
-         return null;
-      }
-
-      KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-
-      String storeLocation = options.getKeyStoreLocation();
-      String storePassword = options.getKeyStorePassword();
-      String storeType = options.getStoreType();
-      String alias = options.getKeyAlias();
-
-      LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
-
-      KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
-      fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
-
-      if (alias == null) {
-         return fact.getKeyManagers();
-      } else {
-         validateAlias(keyStore, alias);
-         return wrapKeyManagers(alias, fact.getKeyManagers());
-      }
-   }
-
-   private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) {
-      KeyManager[] keyManagers = new KeyManager[origKeyManagers.length];
-      for (int i = 0; i < origKeyManagers.length; i++) {
-         KeyManager km = origKeyManagers[i];
-         if (km instanceof X509ExtendedKeyManager) {
-            km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km);
-         }
-
-         keyManagers[i] = km;
-      }
-
-      return keyManagers;
-   }
-
-   private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException {
-      if (!store.containsAlias(alias)) {
-         throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store");
-      }
-
-      if (!store.isKeyEntry(alias)) {
-         throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry");
-      }
-   }
-
-   private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
-      KeyStore store = KeyStore.getInstance(storeType);
-      try (InputStream in = new FileInputStream(new File(storePath));) {
-         store.load(in, password != null ? password.toCharArray() : null);
-      }
-
-      return store;
-   }
-
-   private static TrustManager createTrustAllTrustManager() {
-      return new X509TrustManager() {
-         @Override
-         public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
-         }
-
-         @Override
-         public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
-         }
-
-         @Override
-         public X509Certificate[] getAcceptedIssuers() {
-            return new X509Certificate[0];
-         }
-      };
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
deleted file mode 100644
index 9b0e6e2..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-
-/**
- * Transport for communicating over WebSockets
- */
-public class NettyWSTransport extends NettyTcpTransport {
-
-   private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
-
-   private static final String AMQP_SUB_PROTOCOL = "amqp";
-
-   /**
-    * Create a new transport instance
-    *
-    * @param remoteLocation
-    *        the URI that defines the remote resource to connect to.
-    * @param options
-    *        the transport options used to configure the socket connection.
-    */
-   public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
-      this(null, remoteLocation, options);
-   }
-
-   /**
-    * Create a new transport instance
-    *
-    * @param listener
-    *        the TransportListener that will receive events from this Transport.
-    * @param remoteLocation
-    *        the URI that defines the remote resource to connect to.
-    * @param options
-    *        the transport options used to configure the socket connection.
-    */
-   public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
-      super(listener, remoteLocation, options);
-   }
-
-   @Override
-   public void send(ByteBuf output) throws IOException {
-      checkConnected();
-      int length = output.readableBytes();
-      if (length == 0) {
-         return;
-      }
-
-      LOG.trace("Attempted write of: {} bytes", length);
-
-      channel.writeAndFlush(new BinaryWebSocketFrame(output));
-   }
-
-   @Override
-   protected ChannelInboundHandlerAdapter createChannelHandler() {
-      return new NettyWebSocketTransportHandler();
-   }
-
-   @Override
-   protected void addAdditionalHandlers(ChannelPipeline pipeline) {
-      pipeline.addLast(new HttpClientCodec());
-      pipeline.addLast(new HttpObjectAggregator(8192));
-   }
-
-   @Override
-   protected void handleConnected(Channel channel) throws Exception {
-      LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
-   }
-
-   // ----- Handle connection events -----------------------------------------//
-
-   private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
-
-      private final WebSocketClientHandshaker handshaker;
-
-      NettyWebSocketTransportHandler() {
-         handshaker = WebSocketClientHandshakerFactory.newHandshaker(
-            getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
-            true, new DefaultHttpHeaders(), getMaxFrameSize());
-      }
-
-      @Override
-      public void channelActive(ChannelHandlerContext context) throws Exception {
-         handshaker.handshake(context.channel());
-
-         super.channelActive(context);
-      }
-
-      @Override
-      protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
-         LOG.trace("New data read: incoming: {}", message);
-
-         Channel ch = ctx.channel();
-         if (!handshaker.isHandshakeComplete()) {
-            handshaker.finishHandshake(ch, (FullHttpResponse) message);
-            LOG.trace("WebSocket Client connected! {}", ctx.channel());
-            // Now trigger super processing as we are really connected.
-            NettyWSTransport.super.handleConnected(ch);
-            return;
-         }
-
-         // We shouldn't get this since we handle the handshake previously.
-         if (message instanceof FullHttpResponse) {
-            FullHttpResponse response = (FullHttpResponse) message;
-            throw new IllegalStateException(
-               "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
-         }
-
-         WebSocketFrame frame = (WebSocketFrame) message;
-         if (frame instanceof TextWebSocketFrame) {
-            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
-            LOG.warn("WebSocket Client received message: " + textFrame.text());
-            ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
-         } else if (frame instanceof BinaryWebSocketFrame) {
-            BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
-            LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
-            listener.onData(binaryFrame.content());
-         } else if (frame instanceof ContinuationWebSocketFrame) {
-            ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
-            LOG.trace("WebSocket Client received data continuation: {} bytes", continuationFrame.content().readableBytes());
-            listener.onData(continuationFrame.content());
-         } else if (frame instanceof PingWebSocketFrame) {
-            LOG.trace("WebSocket Client received ping, response with pong");
-            ch.write(new PongWebSocketFrame(frame.content()));
-         } else if (frame instanceof CloseWebSocketFrame) {
-            LOG.trace("WebSocket Client received closing");
-            ch.close();
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
deleted file mode 100644
index 42d6a0b..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.X509ExtendedKeyManager;
-import java.net.Socket;
-import java.security.Principal;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-
-/**
- * An X509ExtendedKeyManager wrapper which always chooses and only
- * returns the given alias, and defers retrieval to the delegate
- * key manager.
- */
-public class X509AliasKeyManager extends X509ExtendedKeyManager {
-
-   private X509ExtendedKeyManager delegate;
-   private String alias;
-
-   public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException {
-      if (alias == null) {
-         throw new IllegalArgumentException("The given key alias must not be null.");
-      }
-
-      this.alias = alias;
-      this.delegate = delegate;
-   }
-
-   @Override
-   public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {
-      return alias;
-   }
-
-   @Override
-   public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {
-      return alias;
-   }
-
-   @Override
-   public X509Certificate[] getCertificateChain(String alias) {
-      return delegate.getCertificateChain(alias);
-   }
-
-   @Override
-   public String[] getClientAliases(String keyType, Principal[] issuers) {
-      return new String[]{alias};
-   }
-
-   @Override
-   public PrivateKey getPrivateKey(String alias) {
-      return delegate.getPrivateKey(alias);
-   }
-
-   @Override
-   public String[] getServerAliases(String keyType, Principal[] issuers) {
-      return new String[]{alias};
-   }
-
-   @Override
-   public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
-      return alias;
-   }
-
-   @Override
-   public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
-      return alias;
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
new file mode 100644
index 0000000..9eab670
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.netty;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+   private static final int SHUTDOWN_TIMEOUT = 100;
+   public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
+
+   protected Bootstrap bootstrap;
+   protected EventLoopGroup group;
+   protected Channel channel;
+   protected NettyTransportListener listener;
+   protected final NettyTransportOptions options;
+   protected final URI remote;
+   protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+
+   private final AtomicBoolean connected = new AtomicBoolean();
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private final CountDownLatch connectLatch = new CountDownLatch(1);
+   private volatile IOException failureCause;
+
+   /**
+    * Create a new transport instance
+    *
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
+    */
+   public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+      this(null, remoteLocation, options);
+   }
+
+   /**
+    * Create a new transport instance
+    *
+    * @param listener
+    *        the TransportListener that will receive events from this Transport.
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
+    */
+   public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+      if (options == null) {
+         throw new IllegalArgumentException("Transport Options cannot be null");
+      }
+
+      if (remoteLocation == null) {
+         throw new IllegalArgumentException("Transport remote location cannot be null");
+      }
+
+      this.options = options;
+      this.listener = listener;
+      this.remote = remoteLocation;
+   }
+
+   @Override
+   public void connect() throws IOException {
+
+      if (listener == null) {
+         throw new IllegalStateException("A transport listener must be set before connection attempts.");
+      }
+
+      final SslHandler sslHandler;
+      if (isSSL()) {
+         try {
+            sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+         } catch (Exception ex) {
+            // TODO: can we stop it throwing Exception?
+            throw IOExceptionSupport.create(ex);
+         }
+      } else {
+         sslHandler = null;
+      }
+
+      group = new NioEventLoopGroup(1);
+
+      bootstrap = new Bootstrap();
+      bootstrap.group(group);
+      bootstrap.channel(NioSocketChannel.class);
+      bootstrap.handler(new ChannelInitializer<Channel>() {
+         @Override
+         public void initChannel(Channel connectedChannel) throws Exception {
+            configureChannel(connectedChannel, sslHandler);
+         }
+      });
+
+      configureNetty(bootstrap, getTransportOptions());
+
+      ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+      future.addListener(new ChannelFutureListener() {
+
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception {
+            if (!future.isSuccess()) {
+               handleException(future.channel(), IOExceptionSupport.create(future.cause()));
+            }
+         }
+      });
+
+      try {
+         connectLatch.await();
+      } catch (InterruptedException ex) {
+         LOG.debug("Transport connection was interrupted.");
+         Thread.interrupted();
+         failureCause = IOExceptionSupport.create(ex);
+      }
+
+      if (failureCause != null) {
+         // Close out any Netty resources now as they are no longer needed.
+         if (channel != null) {
+            channel.close().syncUninterruptibly();
+            channel = null;
+         }
+         if (group != null) {
+            Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
+            group = null;
+         }
+
+         throw failureCause;
+      } else {
+         // Connected, allow any held async error to fire now and close the transport.
+         channel.eventLoop().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               if (failureCause != null) {
+                  channel.pipeline().fireExceptionCaught(failureCause);
+               }
+            }
+         });
+      }
+   }
+
+   @Override
+   public boolean isConnected() {
+      return connected.get();
+   }
+
+   @Override
+   public boolean isSSL() {
+      return options.isSSL();
+   }
+
+   @Override
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         connected.set(false);
+         try {
+            if (channel != null) {
+               channel.close().syncUninterruptibly();
+            }
+         } finally {
+            if (group != null) {
+               Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+               if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                  LOG.trace("Channel group shutdown failed to complete in allotted time");
+               }
+            }
+         }
+      }
+   }
+
+   @Override
+   public ByteBuf allocateSendBuffer(int size) throws IOException {
+      checkConnected();
+      return channel.alloc().ioBuffer(size, size);
+   }
+
+   @Override
+   public ChannelFuture send(ByteBuf output) throws IOException {
+      checkConnected();
+      int length = output.readableBytes();
+      if (length == 0) {
+         return null;
+      }
+
+      LOG.trace("Attempted write of: {} bytes", length);
+
+      return channel.writeAndFlush(output);
+   }
+
+   @Override
+   public NettyTransportListener getTransportListener() {
+      return listener;
+   }
+
+   @Override
+   public void setTransportListener(NettyTransportListener listener) {
+      this.listener = listener;
+   }
+
+   @Override
+   public NettyTransportOptions getTransportOptions() {
+      return options;
+   }
+
+   @Override
+   public URI getRemoteLocation() {
+      return remote;
+   }
+
+   @Override
+   public Principal getLocalPrincipal() {
+      Principal result = null;
+
+      if (isSSL()) {
+         SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+         result = sslHandler.engine().getSession().getLocalPrincipal();
+      }
+
+      return result;
+   }
+
+   @Override
+   public void setMaxFrameSize(int maxFrameSize) {
+      if (connected.get()) {
+         throw new IllegalStateException("Cannot change Max Frame Size while connected.");
+      }
+
+      this.maxFrameSize = maxFrameSize;
+   }
+
+   @Override
+   public int getMaxFrameSize() {
+      return maxFrameSize;
+   }
+
+   // ----- Internal implementation details, can be overridden as needed -----//
+
+   protected String getRemoteHost() {
+      return remote.getHost();
+   }
+
+   protected int getRemotePort() {
+      if (remote.getPort() != -1) {
+         return remote.getPort();
+      } else {
+         return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
+      }
+   }
+
+   protected void addAdditionalHandlers(ChannelPipeline pipeline) {
+
+   }
+
+   protected ChannelInboundHandlerAdapter createChannelHandler() {
+      return new NettyTcpTransportHandler();
+   }
+
+   // ----- Event Handlers which can be overridden in subclasses -------------//
+
+   protected void handleConnected(Channel channel) throws Exception {
+      LOG.trace("Channel has become active! Channel is {}", channel);
+      connectionEstablished(channel);
+   }
+
+   protected void handleChannelInactive(Channel channel) throws Exception {
+      LOG.trace("Channel has gone inactive! Channel is {}", channel);
+      if (connected.compareAndSet(true, false) && !closed.get()) {
+         LOG.trace("Firing onTransportClosed listener");
+         listener.onTransportClosed();
+      }
+   }
+
+   protected void handleException(Channel channel, Throwable cause) throws Exception {
+      LOG.trace("Exception on channel! Channel is {}", channel);
+      if (connected.compareAndSet(true, false) && !closed.get()) {
+         LOG.trace("Firing onTransportError listener");
+         if (failureCause != null) {
+            listener.onTransportError(failureCause);
+         } else {
+            listener.onTransportError(cause);
+         }
+      } else {
+         // Hold the first failure for later dispatch if connect succeeds.
+         // This will then trigger disconnect using the first error reported.
+         if (failureCause == null) {
+            LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+            failureCause = IOExceptionSupport.create(cause);
+         }
+
+         connectionFailed(channel, failureCause);
+      }
+   }
+
+   // ----- State change handlers and checks ---------------------------------//
+
+   protected final void checkConnected() throws IOException {
+      if (!connected.get()) {
+         throw new IOException("Cannot send to a non-connected transport.");
+      }
+   }
+
+   /*
+    * Called when the transport has successfully connected and is ready for use.
+    */
+   private void connectionEstablished(Channel connectedChannel) {
+      channel = connectedChannel;
+      connected.set(true);
+      connectLatch.countDown();
+   }
+
+   /*
+    * Called when the transport connection failed and an error should be returned.
+    */
+   private void connectionFailed(Channel failedChannel, IOException cause) {
+      failureCause = cause;
+      channel = failedChannel;
+      connected.set(false);
+      connectLatch.countDown();
+   }
+
+   private NettyTransportSslOptions getSslOptions() {
+      return (NettyTransportSslOptions) getTransportOptions();
+   }
+
+   private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+
+      if (options.getSendBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+      }
+
+      if (options.getReceiveBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+      }
+
+      if (options.getTrafficClass() != -1) {
+         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+      }
+   }
+
+   private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
+      if (isSSL()) {
+         channel.pipeline().addLast(sslHandler);
+      }
+
+      if (getTransportOptions().isTraceBytes()) {
+         channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
+      }
+
+      addAdditionalHandlers(channel.pipeline());
+
+      channel.pipeline().addLast(createChannelHandler());
+   }
+
+   // ----- Handle connection events -----------------------------------------//
+
+   protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
+
+      @Override
+      public void channelRegistered(ChannelHandlerContext context) throws Exception {
+         channel = context.channel();
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext context) throws Exception {
+         // In the Secure case we need to let the handshake complete before we
+         // trigger the connected event.
+         if (!isSSL()) {
+            handleConnected(context.channel());
+         } else {
+            SslHandler sslHandler = context.pipeline().get(SslHandler.class);
+            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+               @Override
+               public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                     LOG.trace("SSL Handshake has completed: {}", channel);
+                     handleConnected(channel);
+                  } else {
+                     LOG.trace("SSL Handshake has failed: {}", channel);
+                     handleException(channel, future.cause());
+                  }
+               }
+            });
+         }
+      }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext context) throws Exception {
+         handleChannelInactive(context.channel());
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+         handleException(context.channel(), cause);
+      }
+   }
+
+   // ----- Handle Binary data from connection -------------------------------//
+
+   protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
+
+      @Override
+      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+         LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+         listener.onData(buffer);
+      }
+   }
+}