You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/05/11 20:53:47 UTC

[2/3] activemq-artemis git commit: ARTEMIS-1159 Fixes and Improvements to the AMQP test client.

ARTEMIS-1159 Fixes and Improvements to the AMQP test client.

Port fixes to the AMQP test client recently made in the 5.x version.
Fixes some thread safety issues in the Transport.  Ensures more 
timely shutdown of the Connection executor.  Uses a dynamic Proxy 
to generate Read-Only Proton wrappers instead of the hand crafted 
versions.  Adds additional logging for test data



Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/4ad78c7f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/4ad78c7f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/4ad78c7f

Branch: refs/heads/master
Commit: 4ad78c7fd044fe13e6423707559d61228e4c48cd
Parents: a98dccb
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu May 11 16:46:22 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu May 11 16:46:22 2017 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpConnection.java   |  14 +-
 .../transport/amqp/client/AmqpMessage.java      |   4 +-
 .../transport/amqp/client/AmqpReceiver.java     |   4 +-
 .../transport/amqp/client/AmqpSender.java       |   4 +-
 .../transport/amqp/client/AmqpSession.java      |   4 +-
 .../client/transport/NettyTcpTransport.java     | 308 ++++++++-------
 .../amqp/client/transport/NettyTransport.java   |   2 +-
 .../client/transport/NettyTransportFactory.java |  16 +-
 .../transport/NettyTransportListener.java       |  14 +-
 .../client/transport/NettyTransportOptions.java |  59 ++-
 .../transport/NettyTransportSslOptions.java     |  56 ++-
 .../client/transport/NettyTransportSupport.java |  81 ++--
 .../amqp/client/transport/NettyWSTransport.java | 378 +++----------------
 .../client/util/UnmodifiableConnection.java     | 202 ----------
 .../amqp/client/util/UnmodifiableDelivery.java  | 179 ---------
 .../amqp/client/util/UnmodifiableLink.java      | 306 ---------------
 .../amqp/client/util/UnmodifiableProxy.java     | 167 ++++++++
 .../amqp/client/util/UnmodifiableReceiver.java  |  65 ----
 .../amqp/client/util/UnmodifiableSender.java    |  51 ---
 .../amqp/client/util/UnmodifiableSession.java   | 198 ----------
 .../amqp/client/util/UnmodifiableTransport.java | 274 --------------
 21 files changed, 558 insertions(+), 1828 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index a30bae1..062729c 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -25,8 +25,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -39,7 +39,7 @@ import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IdGenerator;
 import org.apache.activemq.transport.amqp.client.util.NoOpAsyncResult;
-import org.apache.activemq.transport.amqp.client.util.UnmodifiableConnection;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
@@ -74,7 +74,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    public static final long DEFAULT_CLOSE_TIMEOUT = 30000;
    public static final long DEFAULT_DRAIN_TIMEOUT = 60000;
 
-   private final ScheduledExecutorService serializer;
+   private ScheduledThreadPoolExecutor serializer;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicBoolean connected = new AtomicBoolean();
    private final AtomicLong sessionIdGenerator = new AtomicLong();
@@ -121,7 +121,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
       this.connectionId = CONNECTION_ID_GENERATOR.generateId();
       this.remoteURI = transport.getRemoteLocation();
 
-      this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+      this.serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
          @Override
          public Thread newThread(Runnable runner) {
@@ -132,6 +132,10 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
          }
       });
 
+      // Ensure timely shutdown
+      this.serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+      this.serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
       this.transport.setTransportListener(this);
    }
 
@@ -434,7 +438,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
    }
 
    public Connection getConnection() {
-      return new UnmodifiableConnection(getEndpoint());
+      return UnmodifiableProxy.connectionProxy(getEndpoint());
    }
 
    public AmqpConnectionListener getListener() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index bf9e0b5..a10d27a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -20,7 +20,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
-import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -100,7 +100,7 @@ public class AmqpMessage {
     */
    public Delivery getWrappedDelivery() {
       if (delivery != null) {
-         return new UnmodifiableDelivery(delivery);
+         return UnmodifiableProxy.deliveryProxy(delivery);
       }
 
       return null;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
index 8653cff..c2c7217 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java
@@ -35,7 +35,7 @@ import javax.jms.InvalidDestinationException;
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
 import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.apache.activemq.transport.amqp.client.util.UnmodifiableReceiver;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -677,7 +677,7 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> {
     * @return an unmodifiable view of the underlying Receiver instance.
     */
    public Receiver getReceiver() {
-      return new UnmodifiableReceiver(getEndpoint());
+      return UnmodifiableProxy.receiverProxy(getEndpoint());
    }
 
    // ----- Receiver configuration properties --------------------------------//

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
index 03bd28e..846739a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java
@@ -29,7 +29,7 @@ import javax.jms.InvalidDestinationException;
 
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
-import org.apache.activemq.transport.amqp.client.util.UnmodifiableSender;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
@@ -231,7 +231,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
     * @return an unmodifiable view of the underlying Sender instance.
     */
    public Sender getSender() {
-      return new UnmodifiableSender(getEndpoint());
+      return UnmodifiableProxy.senderProxy(getEndpoint());
    }
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
index 677b354..8c331ca 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
-import org.apache.activemq.transport.amqp.client.util.UnmodifiableSession;
+import org.apache.activemq.transport.amqp.client.util.UnmodifiableProxy;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
@@ -598,7 +598,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> {
    }
 
    public Session getSession() {
-      return new UnmodifiableSession(getEndpoint());
+      return UnmodifiableProxy.sessionProxy(getEndpoint());
    }
 
    public boolean isInTransaction() {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
index 29963a0..473fd75 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -23,25 +23,29 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LoggingHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * TCP based transport that uses Netty as the underlying IO layer.
@@ -50,28 +54,27 @@ public class NettyTcpTransport implements NettyTransport {
 
    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
 
-   private static final int QUIET_PERIOD = 20;
    private static final int SHUTDOWN_TIMEOUT = 100;
 
    protected Bootstrap bootstrap;
    protected EventLoopGroup group;
    protected Channel channel;
    protected NettyTransportListener listener;
-   protected NettyTransportOptions options;
+   protected final NettyTransportOptions options;
    protected final URI remote;
-   protected boolean secure;
 
    private final AtomicBoolean connected = new AtomicBoolean();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final CountDownLatch connectLatch = new CountDownLatch(1);
-   private IOException failureCause;
-   private Throwable pendingFailure;
+   private volatile IOException failureCause;
 
    /**
     * Create a new transport instance
     *
-    * @param remoteLocation the URI that defines the remote resource to connect to.
-    * @param options        the transport options used to configure the socket connection.
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
     */
    public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
       this(null, remoteLocation, options);
@@ -80,15 +83,25 @@ public class NettyTcpTransport implements NettyTransport {
    /**
     * Create a new transport instance
     *
-    * @param listener       the TransportListener that will receive events from this Transport.
-    * @param remoteLocation the URI that defines the remote resource to connect to.
-    * @param options        the transport options used to configure the socket connection.
+    * @param listener
+    *        the TransportListener that will receive events from this Transport.
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
     */
    public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+      if (options == null) {
+         throw new IllegalArgumentException("Transport Options cannot be null");
+      }
+
+      if (remoteLocation == null) {
+         throw new IllegalArgumentException("Transport remote location cannot be null");
+      }
+
       this.options = options;
       this.listener = listener;
       this.remote = remoteLocation;
-      this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
    }
 
    @Override
@@ -98,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport {
          throw new IllegalStateException("A transport listener must be set before connection attempts.");
       }
 
+      final SslHandler sslHandler;
+      if (isSSL()) {
+         try {
+            sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+         } catch (Exception ex) {
+            // TODO: can we stop it throwing Exception?
+            throw IOExceptionSupport.create(ex);
+         }
+      } else {
+         sslHandler = null;
+      }
+
       group = new NioEventLoopGroup(1);
 
       bootstrap = new Bootstrap();
       bootstrap.group(group);
       bootstrap.channel(NioSocketChannel.class);
       bootstrap.handler(new ChannelInitializer<Channel>() {
-
          @Override
          public void initChannel(Channel connectedChannel) throws Exception {
-            configureChannel(connectedChannel);
+            configureChannel(connectedChannel, sslHandler);
          }
       });
 
@@ -118,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport {
 
          @Override
          public void operationComplete(ChannelFuture future) throws Exception {
-            if (future.isSuccess()) {
-               handleConnected(future.channel());
-            } else if (future.isCancelled()) {
-               connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
-            } else {
-               connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+            if (!future.isSuccess()) {
+               handleException(future.channel(), IOExceptionSupport.create(future.cause()));
             }
          }
       });
@@ -143,7 +163,10 @@ public class NettyTcpTransport implements NettyTransport {
             channel = null;
          }
          if (group != null) {
-            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+               LOG.trace("Channel group shutdown failed to complete in allotted time");
+            }
             group = null;
          }
 
@@ -154,8 +177,8 @@ public class NettyTcpTransport implements NettyTransport {
 
             @Override
             public void run() {
-               if (pendingFailure != null) {
-                  channel.pipeline().fireExceptionCaught(pendingFailure);
+               if (failureCause != null) {
+                  channel.pipeline().fireExceptionCaught(failureCause);
                }
             }
          });
@@ -169,18 +192,24 @@ public class NettyTcpTransport implements NettyTransport {
 
    @Override
    public boolean isSSL() {
-      return secure;
+      return options.isSSL();
    }
 
    @Override
    public void close() throws IOException {
       if (closed.compareAndSet(false, true)) {
          connected.set(false);
-         if (channel != null) {
-            channel.close().syncUninterruptibly();
-         }
-         if (group != null) {
-            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+         try {
+            if (channel != null) {
+               channel.close().syncUninterruptibly();
+            }
+         } finally {
+            if (group != null) {
+               Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+               if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+                  LOG.trace("Channel group shutdown failed to complete in allotted time");
+               }
+            }
          }
       }
    }
@@ -216,14 +245,6 @@ public class NettyTcpTransport implements NettyTransport {
 
    @Override
    public NettyTransportOptions getTransportOptions() {
-      if (options == null) {
-         if (isSSL()) {
-            options = NettyTransportSslOptions.INSTANCE;
-         } else {
-            options = NettyTransportOptions.INSTANCE;
-         }
-      }
-
       return options;
    }
 
@@ -234,102 +255,96 @@ public class NettyTcpTransport implements NettyTransport {
 
    @Override
    public Principal getLocalPrincipal() {
-      if (!isSSL()) {
-         throw new UnsupportedOperationException("Not connected to a secure channel");
-      }
+      Principal result = null;
 
-      SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+      if (isSSL()) {
+         SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+         result = sslHandler.engine().getSession().getLocalPrincipal();
+      }
 
-      return sslHandler.engine().getSession().getLocalPrincipal();
+      return result;
    }
 
-   //----- Internal implementation details, can be overridden as needed --//
+   // ----- Internal implementation details, can be overridden as needed -----//
 
    protected String getRemoteHost() {
       return remote.getHost();
    }
 
    protected int getRemotePort() {
-      int port = remote.getPort();
-
-      if (port <= 0) {
-         if (isSSL()) {
-            port = getSslOptions().getDefaultSslPort();
-         } else {
-            port = getTransportOptions().getDefaultTcpPort();
-         }
+      if (remote.getPort() != -1) {
+         return remote.getPort();
+      } else {
+         return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
       }
+   }
+
+   protected void addAdditionalHandlers(ChannelPipeline pipeline) {
 
-      return port;
    }
 
-   protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+   protected ChannelInboundHandlerAdapter createChannelHandler() {
+      return new NettyTcpTransportHandler();
+   }
 
-      if (options.getSendBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-      }
+   // ----- Event Handlers which can be overridden in subclasses -------------//
 
-      if (options.getReceiveBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-      }
+   protected void handleConnected(Channel channel) throws Exception {
+      LOG.trace("Channel has become active! Channel is {}", channel);
+      connectionEstablished(channel);
+   }
 
-      if (options.getTrafficClass() != -1) {
-         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+   protected void handleChannelInactive(Channel channel) throws Exception {
+      LOG.trace("Channel has gone inactive! Channel is {}", channel);
+      if (connected.compareAndSet(true, false) && !closed.get()) {
+         LOG.trace("Firing onTransportClosed listener");
+         listener.onTransportClosed();
       }
    }
 
-   protected void configureChannel(final Channel channel) throws Exception {
-      if (isSSL()) {
-         SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-         sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-            @Override
-            public void operationComplete(Future<Channel> future) throws Exception {
-               if (future.isSuccess()) {
-                  LOG.trace("SSL Handshake has completed: {}", channel);
-                  connectionEstablished(channel);
-               } else {
-                  LOG.trace("SSL Handshake has failed: {}", channel);
-                  connectionFailed(channel, IOExceptionSupport.create(future.cause()));
-               }
-            }
-         });
+   protected void handleException(Channel channel, Throwable cause) throws Exception {
+      LOG.trace("Exception on channel! Channel is {}", channel);
+      if (connected.compareAndSet(true, false) && !closed.get()) {
+         LOG.trace("Firing onTransportError listener");
+         if (failureCause != null) {
+            listener.onTransportError(failureCause);
+         } else {
+            listener.onTransportError(cause);
+         }
+      } else {
+         // Hold the first failure for later dispatch if connect succeeds.
+         // This will then trigger disconnect using the first error reported.
+         if (failureCause == null) {
+            LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+            failureCause = IOExceptionSupport.create(cause);
+         }
 
-         channel.pipeline().addLast(sslHandler);
+         connectionFailed(channel, failureCause);
       }
-
-      channel.pipeline().addLast(new NettyTcpTransportHandler());
    }
 
-   protected void handleConnected(final Channel channel) throws Exception {
-      if (!isSSL()) {
-         connectionEstablished(channel);
+   // ----- State change handlers and checks ---------------------------------//
+
+   protected final void checkConnected() throws IOException {
+      if (!connected.get()) {
+         throw new IOException("Cannot send to a non-connected transport.");
       }
    }
 
-   //----- State change handlers and checks ---------------------------------//
-
-   /**
+   /*
     * Called when the transport has successfully connected and is ready for use.
     */
-   protected void connectionEstablished(Channel connectedChannel) {
+   private void connectionEstablished(Channel connectedChannel) {
       channel = connectedChannel;
       connected.set(true);
       connectLatch.countDown();
    }
 
-   /**
+   /*
     * Called when the transport connection failed and an error should be returned.
-    *
-    * @param failedChannel The Channel instance that failed.
-    * @param cause         An IOException that describes the cause of the failed connection.
     */
-   protected void connectionFailed(Channel failedChannel, IOException cause) {
-      failureCause = IOExceptionSupport.create(cause);
+   private void connectionFailed(Channel failedChannel, IOException cause) {
+      failureCause = cause;
       channel = failedChannel;
       connected.set(false);
       connectLatch.countDown();
@@ -339,49 +354,86 @@ public class NettyTcpTransport implements NettyTransport {
       return (NettyTransportSslOptions) getTransportOptions();
    }
 
-   private void checkConnected() throws IOException {
-      if (!connected.get()) {
-         throw new IOException("Cannot send to a non-connected transport.");
+   private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+
+      if (options.getSendBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
       }
+
+      if (options.getReceiveBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+      }
+
+      if (options.getTrafficClass() != -1) {
+         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+      }
+   }
+
+   private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
+      if (isSSL()) {
+         channel.pipeline().addLast(sslHandler);
+      }
+
+      if (getTransportOptions().isTraceBytes()) {
+         channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
+      }
+
+      addAdditionalHandlers(channel.pipeline());
+
+      channel.pipeline().addLast(createChannelHandler());
    }
 
-   //----- Handle connection events -----------------------------------------//
+   // ----- Handle connection events -----------------------------------------//
 
-   private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+   protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
+
+      @Override
+      public void channelRegistered(ChannelHandlerContext context) throws Exception {
+         channel = context.channel();
+      }
 
       @Override
       public void channelActive(ChannelHandlerContext context) throws Exception {
-         LOG.trace("Channel has become active! Channel is {}", context.channel());
+         // In the Secure case we need to let the handshake complete before we
+         // trigger the connected event.
+         if (!isSSL()) {
+            handleConnected(context.channel());
+         } else {
+            SslHandler sslHandler = context.pipeline().get(SslHandler.class);
+            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+               @Override
+               public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                     LOG.trace("SSL Handshake has completed: {}", channel);
+                     handleConnected(channel);
+                  } else {
+                     LOG.trace("SSL Handshake has failed: {}", channel);
+                     handleException(channel, future.cause());
+                  }
+               }
+            });
+         }
       }
 
       @Override
       public void channelInactive(ChannelHandlerContext context) throws Exception {
-         LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-         if (connected.compareAndSet(true, false) && !closed.get()) {
-            LOG.trace("Firing onTransportClosed listener");
-            listener.onTransportClosed();
-         }
+         handleChannelInactive(context.channel());
       }
 
       @Override
       public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-         LOG.trace("Exception on channel! Channel is {}", context.channel());
-         if (connected.compareAndSet(true, false) && !closed.get()) {
-            LOG.trace("Firing onTransportError listener");
-            if (pendingFailure != null) {
-               listener.onTransportError(pendingFailure);
-            } else {
-               listener.onTransportError(cause);
-            }
-         } else {
-            // Hold the first failure for later dispatch if connect succeeds.
-            // This will then trigger disconnect using the first error reported.
-            if (pendingFailure != null) {
-               LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-               pendingFailure = cause;
-            }
-         }
+         handleException(context.channel(), cause);
       }
+   }
+
+   // ----- Handle Binary data from connection -------------------------------//
+
+   protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
 
       @Override
       protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
index a2bacdc..ad0a1fb 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -23,7 +23,7 @@ import java.security.Principal;
 import io.netty.buffer.ByteBuf;
 
 /**
- *
+ * Base for all Netty based Transports in this client.
  */
 public interface NettyTransport {
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
index f6eae46..30b2e21 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,12 +30,16 @@ public final class NettyTransportFactory {
    }
 
    /**
-    * Creates an instance of the given Transport and configures it using the
-    * properties set on the given remote broker URI.
+    * Creates an instance of the given Transport and configures it using the properties set on
+    * the given remote broker URI.
+    *
+    * @param remoteURI
+    *        The URI used to connect to a remote Peer.
     *
-    * @param remoteURI The URI used to connect to a remote Peer.
     * @return a new Transport instance.
-    * @throws Exception if an error occurs while creating the Transport instance.
+    *
+    * @throws Exception
+    *         if an error occurs while creating the Transport instance.
     */
    public static NettyTransport createTransport(URI remoteURI) throws Exception {
       Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
index c23ca8c..0163517 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,15 +19,16 @@ package org.apache.activemq.transport.amqp.client.transport;
 import io.netty.buffer.ByteBuf;
 
 /**
- * Listener interface that should be implemented by users of the various
- * QpidJMS Transport classes.
+ * Listener interface that should be implemented by users of the various QpidJMS Transport
+ * classes.
  */
 public interface NettyTransportListener {
 
    /**
     * Called when new incoming data has become available.
     *
-    * @param incoming the next incoming packet of data.
+    * @param incoming
+    *        the next incoming packet of data.
     */
    void onData(ByteBuf incoming);
 
@@ -39,7 +40,8 @@ public interface NettyTransportListener {
    /**
     * Called when an error occurs during normal Transport operations.
     *
-    * @param cause the error that triggered this event.
+    * @param cause
+    *        the error that triggered this event.
     */
    void onTransportError(Throwable cause);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
index 3ffb8c8..c5022c1 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -30,6 +30,7 @@ public class NettyTransportOptions implements Cloneable {
    public static final int DEFAULT_SO_TIMEOUT = -1;
    public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
    public static final int DEFAULT_TCP_PORT = 5672;
+   public static final boolean DEFAULT_TRACE_BYTES = false;
 
    public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
 
@@ -42,6 +43,7 @@ public class NettyTransportOptions implements Cloneable {
    private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
    private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
    private int defaultTcpPort = DEFAULT_TCP_PORT;
+   private boolean traceBytes = DEFAULT_TRACE_BYTES;
 
    /**
     * @return the currently set send buffer size in bytes.
@@ -51,11 +53,14 @@ public class NettyTransportOptions implements Cloneable {
    }
 
    /**
-    * Sets the send buffer size in bytes, the value must be greater than zero
-    * or an {@link IllegalArgumentException} will be thrown.
+    * Sets the send buffer size in bytes, the value must be greater than zero or an
+    * {@link IllegalArgumentException} will be thrown.
+    *
+    * @param sendBufferSize
+    *        the new send buffer size for the TCP Transport.
     *
-    * @param sendBufferSize the new send buffer size for the TCP Transport.
-    * @throws IllegalArgumentException if the value given is not in the valid range.
+    * @throws IllegalArgumentException
+    *         if the value given is not in the valid range.
     */
    public void setSendBufferSize(int sendBufferSize) {
       if (sendBufferSize <= 0) {
@@ -73,11 +78,14 @@ public class NettyTransportOptions implements Cloneable {
    }
 
    /**
-    * Sets the receive buffer size in bytes, the value must be greater than zero
-    * or an {@link IllegalArgumentException} will be thrown.
+    * Sets the receive buffer size in bytes, the value must be greater than zero or an
+    * {@link IllegalArgumentException} will be thrown.
     *
-    * @param receiveBufferSize the new receive buffer size for the TCP Transport.
-    * @throws IllegalArgumentException if the value given is not in the valid range.
+    * @param receiveBufferSize
+    *        the new receive buffer size for the TCP Transport.
+    *
+    * @throws IllegalArgumentException
+    *         if the value given is not in the valid range.
     */
    public void setReceiveBufferSize(int receiveBufferSize) {
       if (receiveBufferSize <= 0) {
@@ -95,11 +103,13 @@ public class NettyTransportOptions implements Cloneable {
    }
 
    /**
-    * Sets the traffic class value used by the TCP connection, valid
-    * range is between 0 and 255.
+    * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255.
+    *
+    * @param trafficClass
+    *        the new traffic class value.
     *
-    * @param trafficClass the new traffic class value.
-    * @throws IllegalArgumentException if the value given is not in the valid range.
+    * @throws IllegalArgumentException
+    *         if the value given is not in the valid range.
     */
    public void setTrafficClass(int trafficClass) {
       if (trafficClass < 0 || trafficClass > 255) {
@@ -157,6 +167,27 @@ public class NettyTransportOptions implements Cloneable {
       this.defaultTcpPort = defaultTcpPort;
    }
 
+   /**
+    * @return true if the transport should enable byte tracing
+    */
+   public boolean isTraceBytes() {
+      return traceBytes;
+   }
+
+   /**
+    * Determines if the transport should add a logger for bytes in / out
+    *
+    * @param traceBytes
+    *        should the transport log the bytes in and out.
+    */
+   public void setTraceBytes(boolean traceBytes) {
+      this.traceBytes = traceBytes;
+   }
+
+   public boolean isSSL() {
+      return false;
+   }
+
    @Override
    public NettyTransportOptions clone() {
       return copyOptions(new NettyTransportOptions());

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
index e256fbb..3289fce 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,9 +21,8 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * Holds the defined SSL options for connections that operate over a secure
- * transport.  Options are read from the environment and can be overridden by
- * specifying them on the connection URI.
+ * Holds the defined SSL options for connections that operate over a secure transport. Options
+ * are read from the environment and can be overridden by specifying them on the connection URI.
  */
 public class NettyTransportSslOptions extends NettyTransportOptions {
 
@@ -31,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
    public static final boolean DEFAULT_TRUST_ALL = false;
    public static final boolean DEFAULT_VERIFY_HOST = false;
-   public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
+   public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"}));
    public static final int DEFAULT_SSL_PORT = 5671;
 
    public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
@@ -69,7 +68,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    /**
     * Sets the location on disk of the key store to use.
     *
-    * @param keyStoreLocation the keyStoreLocation to use to create the key manager.
+    * @param keyStoreLocation
+    *        the keyStoreLocation to use to create the key manager.
     */
    public void setKeyStoreLocation(String keyStoreLocation) {
       this.keyStoreLocation = keyStoreLocation;
@@ -83,7 +83,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param keyStorePassword the keyStorePassword to set
+    * @param keyStorePassword
+    *        the keyStorePassword to set
     */
    public void setKeyStorePassword(String keyStorePassword) {
       this.keyStorePassword = keyStorePassword;
@@ -97,7 +98,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param trustStoreLocation the trustStoreLocation to set
+    * @param trustStoreLocation
+    *        the trustStoreLocation to set
     */
    public void setTrustStoreLocation(String trustStoreLocation) {
       this.trustStoreLocation = trustStoreLocation;
@@ -111,7 +113,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param trustStorePassword the trustStorePassword to set
+    * @param trustStorePassword
+    *        the trustStorePassword to set
     */
    public void setTrustStorePassword(String trustStorePassword) {
       this.trustStorePassword = trustStorePassword;
@@ -125,7 +128,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param storeType the format that the store files are encoded in.
+    * @param storeType
+    *        the format that the store files are encoded in.
     */
    public void setStoreType(String storeType) {
       this.storeType = storeType;
@@ -139,7 +143,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param enabledCipherSuites the enabledCipherSuites to set
+    * @param enabledCipherSuites
+    *        the enabledCipherSuites to set
     */
    public void setEnabledCipherSuites(String[] enabledCipherSuites) {
       this.enabledCipherSuites = enabledCipherSuites;
@@ -153,7 +158,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param disabledCipherSuites the disabledCipherSuites to set
+    * @param disabledCipherSuites
+    *        the disabledCipherSuites to set
     */
    public void setDisabledCipherSuites(String[] disabledCipherSuites) {
       this.disabledCipherSuites = disabledCipherSuites;
@@ -169,13 +175,15 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    /**
     * The protocols to be set as enabled.
     *
-    * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
+    * @param enabledProtocols
+    *        the enabled protocols to set, or null if the defaults should be used.
     */
    public void setEnabledProtocols(String[] enabledProtocols) {
       this.enabledProtocols = enabledProtocols;
    }
 
    /**
+    *
     * @return the protocols to disable or null if none should be
     */
    public String[] getDisabledProtocols() {
@@ -185,7 +193,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    /**
     * The protocols to be disable.
     *
-    * @param disabledProtocols the protocols to disable, or null if none should be.
+    * @param disabledProtocols
+    *        the protocols to disable, or null if none should be.
     */
    public void setDisabledProtocols(String[] disabledProtocols) {
       this.disabledProtocols = disabledProtocols;
@@ -202,7 +211,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
     * The protocol value to use when creating an SSLContext via
     * SSLContext.getInstance(protocol).
     *
-    * @param contextProtocol the context protocol to use.
+    * @param contextProtocol
+    *        the context protocol to use.
     */
    public void setContextProtocol(String contextProtocol) {
       this.contextProtocol = contextProtocol;
@@ -216,7 +226,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param trustAll the trustAll to set
+    * @param trustAll
+    *        the trustAll to set
     */
    public void setTrustAll(boolean trustAll) {
       this.trustAll = trustAll;
@@ -230,7 +241,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param verifyHost the verifyHost to set
+    * @param verifyHost
+    *        the verifyHost to set
     */
    public void setVerifyHost(boolean verifyHost) {
       this.verifyHost = verifyHost;
@@ -244,7 +256,8 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    /**
-    * @param keyAlias the key alias to use
+    * @param keyAlias
+    *        the key alias to use
     */
    public void setKeyAlias(String keyAlias) {
       this.keyAlias = keyAlias;
@@ -259,6 +272,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
    }
 
    @Override
+   public boolean isSSL() {
+      return true;
+   }
+
+   @Override
    public NettyTransportSslOptions clone() {
       return copyOptions(new NettyTransportSslOptions());
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
index 15854e8..d41c669 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -16,15 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client.transport;
 
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509ExtendedKeyManager;
-import javax.net.ssl.X509TrustManager;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
@@ -38,10 +29,21 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
-import io.netty.handler.ssl.SslHandler;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedKeyManager;
+import javax.net.ssl.X509TrustManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.handler.ssl.SslHandler;
+
 /**
  * Static class that provides various utility methods used by Transport implementations.
  */
@@ -50,13 +52,18 @@ public class NettyTransportSupport {
    private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
 
    /**
-    * Creates a Netty SslHandler instance for use in Transports that require
-    * an SSL encoder / decoder.
+    * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder /
+    * decoder.
+    *
+    * @param remote
+    *        The URI of the remote peer that the SslHandler will be used against.
+    * @param options
+    *        The SSL options object to build the SslHandler instance from.
     *
-    * @param remote  The URI of the remote peer that the SslHandler will be used against.
-    * @param options The SSL options object to build the SslHandler instance from.
     * @return a new SslHandler that is configured from the given options.
-    * @throws Exception if an error occurs while creating the SslHandler instance.
+    *
+    * @throws Exception
+    *         if an error occurs while creating the SslHandler instance.
     */
    public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
       return new SslHandler(createSslEngine(remote, createSslContext(options), options));
@@ -66,9 +73,13 @@ public class NettyTransportSupport {
     * Create a new SSLContext using the options specific in the given TransportSslOptions
     * instance.
     *
-    * @param options the configured options used to create the SSLContext.
+    * @param options
+    *        the configured options used to create the SSLContext.
+    *
     * @return a new SSLContext instance.
-    * @throws Exception if an error occurs while creating the context.
+    *
+    * @throws Exception
+    *         if an error occurs while creating the context.
     */
    public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
       try {
@@ -91,10 +102,15 @@ public class NettyTransportSupport {
     * Create a new SSLEngine instance in client mode from the given SSLContext and
     * TransportSslOptions instances.
     *
-    * @param context the SSLContext to use when creating the engine.
-    * @param options the TransportSslOptions to use to configure the new SSLEngine.
+    * @param context
+    *        the SSLContext to use when creating the engine.
+    * @param options
+    *        the TransportSslOptions to use to configure the new SSLEngine.
+    *
     * @return a new SSLEngine instance in client mode.
-    * @throws Exception if an error occurs while creating the new SSLEngine.
+    *
+    * @throws Exception
+    *         if an error occurs while creating the new SSLEngine.
     */
    public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
       return createSslEngine(null, context, options);
@@ -104,15 +120,20 @@ public class NettyTransportSupport {
     * Create a new SSLEngine instance in client mode from the given SSLContext and
     * TransportSslOptions instances.
     *
-    * @param remote  the URI of the remote peer that will be used to initialize the engine, may be null if none should.
-    * @param context the SSLContext to use when creating the engine.
-    * @param options the TransportSslOptions to use to configure the new SSLEngine.
+    * @param remote
+    *        the URI of the remote peer that will be used to initialize the engine, may be null
+    *        if none should.
+    * @param context
+    *        the SSLContext to use when creating the engine.
+    * @param options
+    *        the TransportSslOptions to use to configure the new SSLEngine.
+    *
     * @return a new SSLEngine instance in client mode.
-    * @throws Exception if an error occurs while creating the new SSLEngine.
+    *
+    * @throws Exception
+    *         if an error occurs while creating the new SSLEngine.
     */
-   public static SSLEngine createSslEngine(URI remote,
-                                           SSLContext context,
-                                           NettyTransportSslOptions options) throws Exception {
+   public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
       SSLEngine engine = null;
       if (remote == null) {
          engine = context.createSSLEngine();
@@ -185,7 +206,7 @@ public class NettyTransportSupport {
 
    private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
       if (options.isTrustAll()) {
-         return new TrustManager[]{createTrustAllTrustManager()};
+         return new TrustManager[] {createTrustAllTrustManager()};
       }
 
       if (options.getTrustStoreLocation() == null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
index f75a52e..eb595d0 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -18,73 +18,46 @@ package org.apache.activemq.transport.amqp.client.transport;
 
 import java.io.IOException;
 import java.net.URI;
-import java.security.Principal;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.nio.charset.StandardCharsets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Transport for communicating over WebSockets
  */
-public class NettyWSTransport implements NettyTransport {
+public class NettyWSTransport extends NettyTcpTransport {
 
    private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
 
-   private static final int QUIET_PERIOD = 20;
-   private static final int SHUTDOWN_TIMEOUT = 100;
-
-   protected Bootstrap bootstrap;
-   protected EventLoopGroup group;
-   protected Channel channel;
-   protected NettyTransportListener listener;
-   protected NettyTransportOptions options;
-   protected final URI remote;
-   protected boolean secure;
-
-   private final AtomicBoolean connected = new AtomicBoolean();
-   private final AtomicBoolean closed = new AtomicBoolean();
-   private ChannelPromise handshakeFuture;
-   private IOException failureCause;
-   private Throwable pendingFailure;
+   private static final String AMQP_SUB_PROTOCOL = "amqp";
 
    /**
     * Create a new transport instance
     *
-    * @param remoteLocation the URI that defines the remote resource to connect to.
-    * @param options        the transport options used to configure the socket connection.
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
     */
    public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
       this(null, remoteLocation, options);
@@ -93,119 +66,15 @@ public class NettyWSTransport implements NettyTransport {
    /**
     * Create a new transport instance
     *
-    * @param listener       the TransportListener that will receive events from this Transport.
-    * @param remoteLocation the URI that defines the remote resource to connect to.
-    * @param options        the transport options used to configure the socket connection.
+    * @param listener
+    *        the TransportListener that will receive events from this Transport.
+    * @param remoteLocation
+    *        the URI that defines the remote resource to connect to.
+    * @param options
+    *        the transport options used to configure the socket connection.
     */
    public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
-      this.options = options;
-      this.listener = listener;
-      this.remote = remoteLocation;
-      this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
-   }
-
-   @Override
-   public void connect() throws IOException {
-
-      if (listener == null) {
-         throw new IllegalStateException("A transport listener must be set before connection attempts.");
-      }
-
-      group = new NioEventLoopGroup(1);
-
-      bootstrap = new Bootstrap();
-      bootstrap.group(group);
-      bootstrap.channel(NioSocketChannel.class);
-      bootstrap.handler(new ChannelInitializer<Channel>() {
-
-         @Override
-         public void initChannel(Channel connectedChannel) throws Exception {
-            configureChannel(connectedChannel);
-         }
-      });
-
-      configureNetty(bootstrap, getTransportOptions());
-
-      ChannelFuture future;
-      try {
-         future = bootstrap.connect(getRemoteHost(), getRemotePort());
-         future.addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-               if (future.isSuccess()) {
-                  handleConnected(future.channel());
-               } else if (future.isCancelled()) {
-                  connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
-               } else {
-                  connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
-               }
-            }
-         });
-
-         future.sync();
-
-         // Now wait for WS protocol level handshake completion
-         handshakeFuture.await();
-      } catch (InterruptedException ex) {
-         LOG.debug("Transport connection attempt was interrupted.");
-         Thread.interrupted();
-         failureCause = IOExceptionSupport.create(ex);
-      }
-
-      if (failureCause != null) {
-         // Close out any Netty resources now as they are no longer needed.
-         if (channel != null) {
-            channel.close().syncUninterruptibly();
-            channel = null;
-         }
-         if (group != null) {
-            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-            group = null;
-         }
-
-         throw failureCause;
-      } else {
-         // Connected, allow any held async error to fire now and close the transport.
-         channel.eventLoop().execute(new Runnable() {
-
-            @Override
-            public void run() {
-               if (pendingFailure != null) {
-                  channel.pipeline().fireExceptionCaught(pendingFailure);
-               }
-            }
-         });
-      }
-   }
-
-   @Override
-   public boolean isConnected() {
-      return connected.get();
-   }
-
-   @Override
-   public boolean isSSL() {
-      return secure;
-   }
-
-   @Override
-   public void close() throws IOException {
-      if (closed.compareAndSet(false, true)) {
-         connected.set(false);
-         if (channel != null) {
-            channel.close().syncUninterruptibly();
-         }
-         if (group != null) {
-            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-         }
-      }
-   }
-
-   @Override
-   public ByteBuf allocateSendBuffer(int size) throws IOException {
-      checkConnected();
-      return channel.alloc().ioBuffer(size, size);
+      super(listener, remoteLocation, options);
    }
 
    @Override
@@ -222,202 +91,37 @@ public class NettyWSTransport implements NettyTransport {
    }
 
    @Override
-   public NettyTransportListener getTransportListener() {
-      return listener;
+   protected ChannelInboundHandlerAdapter createChannelHandler() {
+      return new NettyWebSocketTransportHandler();
    }
 
    @Override
-   public void setTransportListener(NettyTransportListener listener) {
-      this.listener = listener;
+   protected void addAdditionalHandlers(ChannelPipeline pipeline) {
+      pipeline.addLast(new HttpClientCodec());
+      pipeline.addLast(new HttpObjectAggregator(8192));
    }
 
    @Override
-   public NettyTransportOptions getTransportOptions() {
-      if (options == null) {
-         if (isSSL()) {
-            options = NettyTransportSslOptions.INSTANCE;
-         } else {
-            options = NettyTransportOptions.INSTANCE;
-         }
-      }
-
-      return options;
+   protected void handleConnected(Channel channel) throws Exception {
+      LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
    }
 
-   @Override
-   public URI getRemoteLocation() {
-      return remote;
-   }
-
-   @Override
-   public Principal getLocalPrincipal() {
-      if (!isSSL()) {
-         throw new UnsupportedOperationException("Not connected to a secure channel");
-      }
-
-      SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+   // ----- Handle connection events -----------------------------------------//
 
-      return sslHandler.engine().getSession().getLocalPrincipal();
-   }
-
-   //----- Internal implementation details, can be overridden as needed --//
-
-   protected String getRemoteHost() {
-      return remote.getHost();
-   }
-
-   protected int getRemotePort() {
-      int port = remote.getPort();
-
-      if (port <= 0) {
-         if (isSSL()) {
-            port = getSslOptions().getDefaultSslPort();
-         } else {
-            port = getTransportOptions().getDefaultTcpPort();
-         }
-      }
-
-      return port;
-   }
-
-   protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-
-      if (options.getSendBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-      }
-
-      if (options.getReceiveBufferSize() != -1) {
-         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-      }
-
-      if (options.getTrafficClass() != -1) {
-         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
-      }
-   }
-
-   protected void configureChannel(final Channel channel) throws Exception {
-      if (isSSL()) {
-         SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-         sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-            @Override
-            public void operationComplete(Future<Channel> future) throws Exception {
-               if (future.isSuccess()) {
-                  LOG.trace("SSL Handshake has completed: {}", channel);
-                  connectionEstablished(channel);
-               } else {
-                  LOG.trace("SSL Handshake has failed: {}", channel);
-                  connectionFailed(channel, IOExceptionSupport.create(future.cause()));
-               }
-            }
-         });
-
-         channel.pipeline().addLast(sslHandler);
-      }
-
-      channel.pipeline().addLast(new HttpClientCodec());
-      channel.pipeline().addLast(new HttpObjectAggregator(8192));
-      channel.pipeline().addLast(new NettyTcpTransportHandler());
-   }
-
-   protected void handleConnected(final Channel channel) throws Exception {
-      if (!isSSL()) {
-         connectionEstablished(channel);
-      }
-   }
-
-   //----- State change handlers and checks ---------------------------------//
-
-   /**
-    * Called when the transport has successfully connected and is ready for use.
-    */
-   protected void connectionEstablished(Channel connectedChannel) {
-      LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
-      channel = connectedChannel;
-      connected.set(true);
-   }
-
-   /**
-    * Called when the transport connection failed and an error should be returned.
-    *
-    * @param failedChannel The Channel instance that failed.
-    * @param cause         An IOException that describes the cause of the failed connection.
-    */
-   protected void connectionFailed(Channel failedChannel, IOException cause) {
-      failureCause = IOExceptionSupport.create(cause);
-      channel = failedChannel;
-      connected.set(false);
-      handshakeFuture.setFailure(cause);
-   }
-
-   private NettyTransportSslOptions getSslOptions() {
-      return (NettyTransportSslOptions) getTransportOptions();
-   }
-
-   private void checkConnected() throws IOException {
-      if (!connected.get()) {
-         throw new IOException("Cannot send to a non-connected transport.");
-      }
-   }
-
-   //----- Handle connection events -----------------------------------------//
-
-   private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+   private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
 
       private final WebSocketClientHandshaker handshaker;
 
-      NettyTcpTransportHandler() {
-         handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
-      }
-
-      @Override
-      public void handlerAdded(ChannelHandlerContext context) {
-         LOG.trace("Handler has become added! Channel is {}", context.channel());
-         handshakeFuture = context.newPromise();
+      NettyWebSocketTransportHandler() {
+         handshaker = WebSocketClientHandshakerFactory.newHandshaker(getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true,
+            new DefaultHttpHeaders());
       }
 
       @Override
       public void channelActive(ChannelHandlerContext context) throws Exception {
-         LOG.trace("Channel has become active! Channel is {}", context.channel());
          handshaker.handshake(context.channel());
-      }
 
-      @Override
-      public void channelInactive(ChannelHandlerContext context) throws Exception {
-         LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-         if (connected.compareAndSet(true, false) && !closed.get()) {
-            LOG.trace("Firing onTransportClosed listener");
-            listener.onTransportClosed();
-         }
-      }
-
-      @Override
-      public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-         LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
-         LOG.trace("Error Stack: ", cause);
-         if (connected.compareAndSet(true, false) && !closed.get()) {
-            LOG.trace("Firing onTransportError listener");
-            if (pendingFailure != null) {
-               listener.onTransportError(pendingFailure);
-            } else {
-               listener.onTransportError(cause);
-            }
-         } else {
-            // Hold the first failure for later dispatch if connect succeeds.
-            // This will then trigger disconnect using the first error reported.
-            if (pendingFailure != null) {
-               LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-               pendingFailure = cause;
-            }
-
-            if (!handshakeFuture.isDone()) {
-               handshakeFuture.setFailure(cause);
-            }
-         }
+         super.channelActive(context);
       }
 
       @Override
@@ -427,16 +131,17 @@ public class NettyWSTransport implements NettyTransport {
          Channel ch = ctx.channel();
          if (!handshaker.isHandshakeComplete()) {
             handshaker.finishHandshake(ch, (FullHttpResponse) message);
-            LOG.info("WebSocket Client connected! {}", ctx.channel());
-            handshakeFuture.setSuccess();
+            LOG.trace("WebSocket Client connected! {}", ctx.channel());
+            // Now trigger super processing as we are really connected.
+            NettyWSTransport.super.handleConnected(ch);
             return;
          }
 
          // We shouldn't get this since we handle the handshake previously.
          if (message instanceof FullHttpResponse) {
             FullHttpResponse response = (FullHttpResponse) message;
-            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
-                                               ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+            throw new IllegalStateException(
+               "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
          }
 
          WebSocketFrame frame = (WebSocketFrame) message;
@@ -446,10 +151,11 @@ public class NettyWSTransport implements NettyTransport {
             ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
          } else if (frame instanceof BinaryWebSocketFrame) {
             BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
-            LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+            LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
             listener.onData(binaryFrame.content());
-         } else if (frame instanceof PongWebSocketFrame) {
-            LOG.trace("WebSocket Client received pong");
+         } else if (frame instanceof PingWebSocketFrame) {
+            LOG.trace("WebSocket Client received ping, response with pong");
+            ch.write(new PongWebSocketFrame(frame.content()));
          } else if (frame instanceof CloseWebSocketFrame) {
             LOG.trace("WebSocket Client received closing");
             ch.close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/4ad78c7f/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
deleted file mode 100644
index 5e2ef15..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/UnmodifiableConnection.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.util;
-
-import java.util.EnumSet;
-import java.util.Map;
-
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Collector;
-import org.apache.qpid.proton.engine.Connection;
-import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.apache.qpid.proton.engine.Link;
-import org.apache.qpid.proton.engine.Record;
-import org.apache.qpid.proton.engine.Session;
-import org.apache.qpid.proton.engine.Transport;
-import org.apache.qpid.proton.reactor.Reactor;
-
-/**
- * Unmodifiable Connection wrapper used to prevent test code from accidentally
- * modifying Connection state.
- */
-public class UnmodifiableConnection implements Connection {
-
-   private final Connection connection;
-
-   public UnmodifiableConnection(Connection connection) {
-      this.connection = connection;
-   }
-
-   @Override
-   public EndpointState getLocalState() {
-      return connection.getLocalState();
-   }
-
-   @Override
-   public EndpointState getRemoteState() {
-      return connection.getRemoteState();
-   }
-
-   @Override
-   public ErrorCondition getCondition() {
-      return connection.getCondition();
-   }
-
-   @Override
-   public void setCondition(ErrorCondition condition) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public ErrorCondition getRemoteCondition() {
-      return connection.getRemoteCondition();
-   }
-
-   @Override
-   public void free() {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public void open() {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public void close() {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public Session session() {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public Session sessionHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
-      Session head = connection.sessionHead(local, remote);
-      if (head != null) {
-         head = new UnmodifiableSession(head);
-      }
-
-      return head;
-   }
-
-   @Override
-   public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) {
-      // TODO - If implemented this method should return an unmodifiable link instance.
-      return null;
-   }
-
-   @Override
-   public Delivery getWorkHead() {
-      // TODO - If implemented this method should return an unmodifiable delivery instance.
-      return null;
-   }
-
-   @Override
-   public void setContainer(String container) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public void setHostname(String hostname) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public String getHostname() {
-      return connection.getHostname();
-   }
-
-   @Override
-   public String getRemoteContainer() {
-      return connection.getRemoteContainer();
-   }
-
-   @Override
-   public String getRemoteHostname() {
-      return connection.getRemoteHostname();
-   }
-
-   @Override
-   public void setOfferedCapabilities(Symbol[] capabilities) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public void setDesiredCapabilities(Symbol[] capabilities) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public Symbol[] getRemoteOfferedCapabilities() {
-      return connection.getRemoteOfferedCapabilities();
-   }
-
-   @Override
-   public Symbol[] getRemoteDesiredCapabilities() {
-      return connection.getRemoteDesiredCapabilities();
-   }
-
-   @Override
-   public Map<Symbol, Object> getRemoteProperties() {
-      return connection.getRemoteProperties();
-   }
-
-   @Override
-   public void setProperties(Map<Symbol, Object> properties) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public Object getContext() {
-      return connection.getContext();
-   }
-
-   @Override
-   public void setContext(Object context) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public void collect(Collector collector) {
-      throw new UnsupportedOperationException("Cannot alter the Connection");
-   }
-
-   @Override
-   public String getContainer() {
-      return connection.getContainer();
-   }
-
-   @Override
-   public Transport getTransport() {
-      return new UnmodifiableTransport(connection.getTransport());
-   }
-
-   @Override
-   public Record attachments() {
-      return connection.attachments();
-   }
-
-   @Override
-   public Reactor getReactor() {
-      return connection.getReactor();
-   }
-}