You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/07/18 20:54:52 UTC

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339

Repository: activemq
Updated Branches:
  refs/heads/master f43c09080 -> 4b018b420


https://issues.apache.org/jira/browse/AMQ-6339

Update to latest Netty 4.0.x release

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/34d7b0bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/34d7b0bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/34d7b0bf

Branch: refs/heads/master
Commit: 34d7b0bfcb3a3d61ffe961f53ed5824d54aac232
Parents: f43c090
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 18 15:59:06 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 18 15:59:06 2016 -0400

----------------------------------------------------------------------
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/34d7b0bf/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 04f5991..40776b6 100755
--- a/pom.xml
+++ b/pom.xml
@@ -103,7 +103,7 @@
     <zookeeper-version>3.4.6</zookeeper-version>
     <qpid-proton-version>0.13.0</qpid-proton-version>
     <qpid-jms-version>0.10.0</qpid-jms-version>
-    <netty-all-version>4.0.37.Final</netty-all-version>
+    <netty-all-version>4.0.39.Final</netty-all-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>
     <saxon-version>9.5.1-5</saxon-version>


[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339

Posted by ta...@apache.org.
https://issues.apache.org/jira/browse/AMQ-6339

Clean up the transport implementation and reduce duplication.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4b018b42
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4b018b42
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4b018b42

Branch: refs/heads/master
Commit: 4b018b420637db61763f14b8a02f986087608916
Parents: 34d7b0b
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Jul 18 16:54:37 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Jul 18 16:54:37 2016 -0400

----------------------------------------------------------------------
 .../client/transport/NettyTcpTransport.java     | 257 +++++++------
 .../amqp/client/transport/NettyTransport.java   |   2 +-
 .../client/transport/NettyTransportFactory.java |   2 +-
 .../transport/NettyTransportListener.java       |   2 +-
 .../client/transport/NettyTransportOptions.java |   6 +-
 .../transport/NettyTransportSslOptions.java     |   7 +-
 .../client/transport/NettyTransportSupport.java |   6 +-
 .../amqp/client/transport/NettyWSTransport.java | 356 ++-----------------
 8 files changed, 187 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
index 886ed4b..0980d5e 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -33,8 +33,10 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.FixedRecvByteBufAllocator;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -58,15 +60,13 @@ public class NettyTcpTransport implements NettyTransport {
     protected EventLoopGroup group;
     protected Channel channel;
     protected NettyTransportListener listener;
-    protected NettyTransportOptions options;
+    protected final NettyTransportOptions options;
     protected final URI remote;
-    protected boolean secure;
 
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
     private final CountDownLatch connectLatch = new CountDownLatch(1);
     private IOException failureCause;
-    private Throwable pendingFailure;
 
     /**
      * Create a new transport instance
@@ -91,10 +91,17 @@ public class NettyTcpTransport implements NettyTransport {
      *        the transport options used to configure the socket connection.
      */
     public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+        if (options == null) {
+            throw new IllegalArgumentException("Transport Options cannot be null");
+        }
+
+        if (remoteLocation == null) {
+            throw new IllegalArgumentException("Transport remote location cannot be null");
+        }
+
         this.options = options;
         this.listener = listener;
         this.remote = remoteLocation;
-        this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
     }
 
     @Override
@@ -104,16 +111,27 @@ public class NettyTcpTransport implements NettyTransport {
             throw new IllegalStateException("A transport listener must be set before connection attempts.");
         }
 
+        final SslHandler sslHandler;
+        if (isSSL()) {
+            try {
+                sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+            } catch (Exception ex) {
+                // TODO: can we stop it throwing Exception?
+                throw IOExceptionSupport.create(ex);
+            }
+        } else {
+            sslHandler = null;
+        }
+
         group = new NioEventLoopGroup(1);
 
         bootstrap = new Bootstrap();
         bootstrap.group(group);
         bootstrap.channel(NioSocketChannel.class);
         bootstrap.handler(new ChannelInitializer<Channel>() {
-
             @Override
             public void initChannel(Channel connectedChannel) throws Exception {
-                configureChannel(connectedChannel);
+                configureChannel(connectedChannel, sslHandler);
             }
         });
 
@@ -124,12 +142,8 @@ public class NettyTcpTransport implements NettyTransport {
 
             @Override
             public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                    handleConnected(future.channel());
-                } else if (future.isCancelled()) {
-                    connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
-                } else {
-                    connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+                if (!future.isSuccess()) {
+                    handleException(future.channel(), IOExceptionSupport.create(future.cause()));
                 }
             }
         });
@@ -160,8 +174,8 @@ public class NettyTcpTransport implements NettyTransport {
 
                 @Override
                 public void run() {
-                    if (pendingFailure != null) {
-                        channel.pipeline().fireExceptionCaught(pendingFailure);
+                    if (failureCause != null) {
+                        channel.pipeline().fireExceptionCaught(failureCause);
                     }
                 }
             });
@@ -175,7 +189,7 @@ public class NettyTcpTransport implements NettyTransport {
 
     @Override
     public boolean isSSL() {
-        return secure;
+        return options.isSSL();
     }
 
     @Override
@@ -222,14 +236,6 @@ public class NettyTcpTransport implements NettyTransport {
 
     @Override
     public NettyTransportOptions getTransportOptions() {
-        if (options == null) {
-            if (isSSL()) {
-                options = NettyTransportSslOptions.INSTANCE;
-            } else {
-                options = NettyTransportOptions.INSTANCE;
-            }
-        }
-
         return options;
     }
 
@@ -240,105 +246,96 @@ public class NettyTcpTransport implements NettyTransport {
 
     @Override
     public Principal getLocalPrincipal() {
-        if (!isSSL()) {
-            throw new UnsupportedOperationException("Not connected to a secure channel");
-        }
+        Principal result = null;
 
-        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+        if (isSSL()) {
+            SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+            result = sslHandler.engine().getSession().getLocalPrincipal();
+        }
 
-        return sslHandler.engine().getSession().getLocalPrincipal();
+        return result;
     }
 
-    //----- Internal implementation details, can be overridden as needed --//
+    //----- Internal implementation details, can be overridden as needed -----//
 
     protected String getRemoteHost() {
         return remote.getHost();
     }
 
     protected int getRemotePort() {
-        int port = remote.getPort();
-
-        if (port <= 0) {
-            if (isSSL()) {
-                port = getSslOptions().getDefaultSslPort();
-            } else {
-                port = getTransportOptions().getDefaultTcpPort();
-            }
+        if (remote.getPort() != -1) {
+            return remote.getPort();
+        } else {
+            return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
         }
+    }
+
+    protected void addAdditionalHandlers(ChannelPipeline pipeline) {
 
-        return port;
     }
 
-    protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+    protected ChannelInboundHandlerAdapter createChannelHandler() {
+        return new NettyTcpTransportHandler();
+    }
 
-        if (options.getSendBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-        }
+    //----- Event Handlers which can be overridden in subclasses -------------//
 
-        if (options.getReceiveBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-        }
+    protected void handleConnected(Channel channel) throws Exception {
+        LOG.trace("Channel has become active! Channel is {}", channel);
+        connectionEstablished(channel);
+    }
 
-        if (options.getTrafficClass() != -1) {
-            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+    protected void handleChannelInactive(Channel channel) throws Exception {
+        LOG.trace("Channel has gone inactive! Channel is {}", channel);
+        if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportClosed listener");
+            listener.onTransportClosed();
         }
     }
 
-    protected void configureChannel(final Channel channel) throws Exception {
-        if (isSSL()) {
-            SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-                @Override
-                public void operationComplete(Future<Channel> future) throws Exception {
-                    if (future.isSuccess()) {
-                        LOG.trace("SSL Handshake has completed: {}", channel);
-                        connectionEstablished(channel);
-                    } else {
-                        LOG.trace("SSL Handshake has failed: {}", channel);
-                        connectionFailed(channel, IOExceptionSupport.create(future.cause()));
-                    }
-                }
-            });
+    protected void handleException(Channel channel, Throwable cause) throws Exception {
+        LOG.trace("Exception on channel! Channel is {}", channel);
+        if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportError listener");
+            if (failureCause != null) {
+                listener.onTransportError(failureCause);
+            } else {
+                listener.onTransportError(cause);
+            }
+        } else {
+            // Hold the first failure for later dispatch if connect succeeds.
+            // This will then trigger disconnect using the first error reported.
+            if (failureCause == null) {
+                LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+                failureCause = IOExceptionSupport.create(cause);
+            }
 
-            channel.pipeline().addLast(sslHandler);
+            connectionFailed(channel, failureCause);
         }
-
-        channel.pipeline().addLast(new NettyTcpTransportHandler());
     }
 
-    protected void handleConnected(final Channel channel) throws Exception {
-        if (!isSSL()) {
-            connectionEstablished(channel);
+    //----- State change handlers and checks ---------------------------------//
+
+    protected final void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
         }
     }
 
-    //----- State change handlers and checks ---------------------------------//
-
-    /**
+    /*
      * Called when the transport has successfully connected and is ready for use.
      */
-    protected void connectionEstablished(Channel connectedChannel) {
+    private void connectionEstablished(Channel connectedChannel) {
         channel = connectedChannel;
         connected.set(true);
         connectLatch.countDown();
     }
 
-    /**
+    /*
      * Called when the transport connection failed and an error should be returned.
-     *
-     * @param failedChannel
-     *      The Channel instance that failed.
-     * @param cause
-     *      An IOException that describes the cause of the failed connection.
      */
-    protected void connectionFailed(Channel failedChannel, IOException cause) {
-        failureCause = IOExceptionSupport.create(cause);
+    private void connectionFailed(Channel failedChannel, IOException cause) {
+        failureCause = cause;
         channel = failedChannel;
         connected.set(false);
         connectLatch.countDown();
@@ -348,49 +345,83 @@ public class NettyTcpTransport implements NettyTransport {
         return (NettyTransportSslOptions) getTransportOptions();
     }
 
-    private void checkConnected() throws IOException {
-        if (!connected.get()) {
-            throw new IOException("Cannot send to a non-connected transport.");
+    private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+        if (options.getSendBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+        }
+
+        if (options.getReceiveBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+        }
+
+        if (options.getTrafficClass() != -1) {
+            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
         }
     }
 
+    private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
+        if (isSSL()) {
+            channel.pipeline().addLast(sslHandler);
+        }
+
+        addAdditionalHandlers(channel.pipeline());
+
+        channel.pipeline().addLast(createChannelHandler());
+    }
+
     //----- Handle connection events -----------------------------------------//
 
-    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
+
+        @Override
+        public void channelRegistered(ChannelHandlerContext context) throws Exception {
+            channel = context.channel();
+        }
 
         @Override
         public void channelActive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has become active! Channel is {}", context.channel());
+            // In the Secure case we need to let the handshake complete before we
+            // trigger the connected event.
+            if (!isSSL()) {
+                handleConnected(context.channel());
+            } else {
+                SslHandler sslHandler = context.pipeline().get(SslHandler.class);
+                sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+                    @Override
+                    public void operationComplete(Future<Channel> future) throws Exception {
+                        if (future.isSuccess()) {
+                            LOG.trace("SSL Handshake has completed: {}", channel);
+                            handleConnected(channel);
+                        } else {
+                            LOG.trace("SSL Handshake has failed: {}", channel);
+                            handleException(channel, future.cause());
+                        }
+                    }
+                });
+            }
         }
 
         @Override
         public void channelInactive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportClosed listener");
-                listener.onTransportClosed();
-            }
+            handleChannelInactive(context.channel());
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-            LOG.trace("Exception on channel! Channel is {}", context.channel());
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportError listener");
-                if (pendingFailure != null) {
-                    listener.onTransportError(pendingFailure);
-                } else {
-                    listener.onTransportError(cause);
-                }
-            } else {
-                // Hold the first failure for later dispatch if connect succeeds.
-                // This will then trigger disconnect using the first error reported.
-                if (pendingFailure != null) {
-                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-                    pendingFailure = cause;
-                }
-            }
+            handleException(context.channel(), cause);
         }
+    }
+
+    //----- Handle Binary data from connection -------------------------------//
+
+    protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
index 48be3a2..a5d0214 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -23,7 +23,7 @@ import java.security.Principal;
 import io.netty.buffer.ByteBuf;
 
 /**
- *
+ * Base for all Netty based Transports in this client.
  */
 public interface NettyTransport {
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
index cc47aa2..a06b4d0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
index 09959a3..1b3a91c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
index 7b3dc9b..add924c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -163,6 +163,10 @@ public class NettyTransportOptions implements Cloneable {
         this.defaultTcpPort = defaultTcpPort;
     }
 
+    public boolean isSSL() {
+        return false;
+    }
+
     @Override
     public NettyTransportOptions clone() {
         return copyOptions(new NettyTransportOptions());

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
index b01f884..f81b8b3 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -262,6 +262,11 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
     }
 
     @Override
+    public boolean isSSL() {
+        return true;
+    }
+
+    @Override
     public NettyTransportSslOptions clone() {
         return copyOptions(new NettyTransportSslOptions());
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
index 18f9630..653e915 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.transport.amqp.client.transport;
 
-import io.netty.handler.ssl.SslHandler;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
@@ -44,6 +42,8 @@ import javax.net.ssl.X509TrustManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.handler.ssl.SslHandler;
+
 /**
  * Static class that provides various utility methods used by Transport implementations.
  */

http://git-wip-us.apache.org/repos/asf/activemq/blob/4b018b42/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
index 1b604fe..c2fcfe7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -18,68 +18,38 @@ package org.apache.activemq.transport.amqp.client.transport;
 
 import java.io.IOException;
 import java.net.URI;
-import java.security.Principal;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.nio.charset.StandardCharsets;
 
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.bootstrap.Bootstrap;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.DefaultHttpHeaders;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpClientCodec;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
 import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 
 /**
  * Transport for communicating over WebSockets
  */
-public class NettyWSTransport implements NettyTransport {
+public class NettyWSTransport extends NettyTcpTransport {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
 
-    private static final int QUIET_PERIOD = 20;
-    private static final int SHUTDOWN_TIMEOUT = 100;
-
-    protected Bootstrap bootstrap;
-    protected EventLoopGroup group;
-    protected Channel channel;
-    protected NettyTransportListener listener;
-    protected NettyTransportOptions options;
-    protected final URI remote;
-    protected boolean secure;
-
-    private final AtomicBoolean connected = new AtomicBoolean();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private ChannelPromise handshakeFuture;
-    private IOException failureCause;
-    private Throwable pendingFailure;
+    private static final String AMQP_SUB_PROTOCOL = "amqp";
 
     /**
      * Create a new transport instance
@@ -104,114 +74,7 @@ public class NettyWSTransport implements NettyTransport {
      *        the transport options used to configure the socket connection.
      */
     public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
-        this.options = options;
-        this.listener = listener;
-        this.remote = remoteLocation;
-        this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
-    }
-
-    @Override
-    public void connect() throws IOException {
-
-        if (listener == null) {
-            throw new IllegalStateException("A transport listener must be set before connection attempts.");
-        }
-
-        group = new NioEventLoopGroup(1);
-
-        bootstrap = new Bootstrap();
-        bootstrap.group(group);
-        bootstrap.channel(NioSocketChannel.class);
-        bootstrap.handler(new ChannelInitializer<Channel>() {
-
-            @Override
-            public void initChannel(Channel connectedChannel) throws Exception {
-                configureChannel(connectedChannel);
-            }
-        });
-
-        configureNetty(bootstrap, getTransportOptions());
-
-        ChannelFuture future;
-        try {
-            future = bootstrap.connect(getRemoteHost(), getRemotePort());
-            future.addListener(new ChannelFutureListener() {
-
-                @Override
-                public void operationComplete(ChannelFuture future) throws Exception {
-                    if (future.isSuccess()) {
-                        handleConnected(future.channel());
-                    } else if (future.isCancelled()) {
-                        connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
-                    } else {
-                        connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
-                    }
-                }
-            });
-
-            future.sync();
-
-            // Now wait for WS protocol level handshake completion
-            handshakeFuture.await();
-        } catch (InterruptedException ex) {
-            LOG.debug("Transport connection attempt was interrupted.");
-            Thread.interrupted();
-            failureCause = IOExceptionSupport.create(ex);
-        }
-
-        if (failureCause != null) {
-            // Close out any Netty resources now as they are no longer needed.
-            if (channel != null) {
-                channel.close().syncUninterruptibly();
-                channel = null;
-            }
-            if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-                group = null;
-            }
-
-            throw failureCause;
-        } else {
-            // Connected, allow any held async error to fire now and close the transport.
-            channel.eventLoop().execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    if (pendingFailure != null) {
-                        channel.pipeline().fireExceptionCaught(pendingFailure);
-                    }
-                }
-            });
-        }
-    }
-
-    @Override
-    public boolean isConnected() {
-        return connected.get();
-    }
-
-    @Override
-    public boolean isSSL() {
-        return secure;
-    }
-
-    @Override
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            connected.set(false);
-            if (channel != null) {
-                channel.close().syncUninterruptibly();
-            }
-            if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    @Override
-    public ByteBuf allocateSendBuffer(int size) throws IOException {
-        checkConnected();
-        return channel.alloc().ioBuffer(size, size);
+        super(listener, remoteLocation, options);
     }
 
     @Override
@@ -228,206 +91,37 @@ public class NettyWSTransport implements NettyTransport {
     }
 
     @Override
-    public NettyTransportListener getTransportListener() {
-        return listener;
+    protected ChannelInboundHandlerAdapter createChannelHandler() {
+        return new NettyWebSocketTransportHandler();
     }
 
     @Override
-    public void setTransportListener(NettyTransportListener listener) {
-        this.listener = listener;
+    protected void addAdditionalHandlers(ChannelPipeline pipeline) {
+        pipeline.addLast(new HttpClientCodec());
+        pipeline.addLast(new HttpObjectAggregator(8192));
     }
 
     @Override
-    public NettyTransportOptions getTransportOptions() {
-        if (options == null) {
-            if (isSSL()) {
-                options = NettyTransportSslOptions.INSTANCE;
-            } else {
-                options = NettyTransportOptions.INSTANCE;
-            }
-        }
-
-        return options;
-    }
-
-    @Override
-    public URI getRemoteLocation() {
-        return remote;
-    }
-
-    @Override
-    public Principal getLocalPrincipal() {
-        if (!isSSL()) {
-            throw new UnsupportedOperationException("Not connected to a secure channel");
-        }
-
-        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
-
-        return sslHandler.engine().getSession().getLocalPrincipal();
-    }
-
-    //----- Internal implementation details, can be overridden as needed --//
-
-    protected String getRemoteHost() {
-        return remote.getHost();
-    }
-
-    protected int getRemotePort() {
-        int port = remote.getPort();
-
-        if (port <= 0) {
-            if (isSSL()) {
-                port = getSslOptions().getDefaultSslPort();
-            } else {
-                port = getTransportOptions().getDefaultTcpPort();
-            }
-        }
-
-        return port;
-    }
-
-    protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
-
-        if (options.getSendBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-        }
-
-        if (options.getReceiveBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-        }
-
-        if (options.getTrafficClass() != -1) {
-            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
-        }
-    }
-
-    protected void configureChannel(final Channel channel) throws Exception {
-        if (isSSL()) {
-            SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-                @Override
-                public void operationComplete(Future<Channel> future) throws Exception {
-                    if (future.isSuccess()) {
-                        LOG.trace("SSL Handshake has completed: {}", channel);
-                        connectionEstablished(channel);
-                    } else {
-                        LOG.trace("SSL Handshake has failed: {}", channel);
-                        connectionFailed(channel, IOExceptionSupport.create(future.cause()));
-                    }
-                }
-            });
-
-            channel.pipeline().addLast(sslHandler);
-        }
-
-        channel.pipeline().addLast(new HttpClientCodec());
-        channel.pipeline().addLast(new HttpObjectAggregator(8192));
-        channel.pipeline().addLast(new NettyTcpTransportHandler());
-    }
-
-    protected void handleConnected(final Channel channel) throws Exception {
-        if (!isSSL()) {
-            connectionEstablished(channel);
-        }
-    }
-
-    //----- State change handlers and checks ---------------------------------//
-
-    /**
-     * Called when the transport has successfully connected and is ready for use.
-     */
-    protected void connectionEstablished(Channel connectedChannel) {
-        LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
-        channel = connectedChannel;
-        connected.set(true);
-    }
-
-    /**
-     * Called when the transport connection failed and an error should be returned.
-     *
-     * @param failedChannel
-     *      The Channel instance that failed.
-     * @param cause
-     *      An IOException that describes the cause of the failed connection.
-     */
-    protected void connectionFailed(Channel failedChannel, IOException cause) {
-        failureCause = IOExceptionSupport.create(cause);
-        channel = failedChannel;
-        connected.set(false);
-        handshakeFuture.setFailure(cause);
-    }
-
-    private NettyTransportSslOptions getSslOptions() {
-        return (NettyTransportSslOptions) getTransportOptions();
-    }
-
-    private void checkConnected() throws IOException {
-        if (!connected.get()) {
-            throw new IOException("Cannot send to a non-connected transport.");
-        }
+    protected void handleConnected(Channel channel) throws Exception {
+        LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
     }
 
     //----- Handle connection events -----------------------------------------//
 
-    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+    private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
 
         private final WebSocketClientHandshaker handshaker;
 
-        public NettyTcpTransportHandler() {
+        public NettyWebSocketTransportHandler() {
             handshaker = WebSocketClientHandshakerFactory.newHandshaker(
-                remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
-        }
-
-        @Override
-        public void handlerAdded(ChannelHandlerContext context) {
-            LOG.trace("Handler has become added! Channel is {}", context.channel());
-            handshakeFuture = context.newPromise();
+                getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL, true, new DefaultHttpHeaders());
         }
 
         @Override
         public void channelActive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has become active! Channel is {}", context.channel());
             handshaker.handshake(context.channel());
-        }
-
-        @Override
-        public void channelInactive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportClosed listener");
-                listener.onTransportClosed();
-            }
-        }
 
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-            LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
-            LOG.trace("Error Stack: ", cause);
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportError listener");
-                if (pendingFailure != null) {
-                    listener.onTransportError(pendingFailure);
-                } else {
-                    listener.onTransportError(cause);
-                }
-            } else {
-                // Hold the first failure for later dispatch if connect succeeds.
-                // This will then trigger disconnect using the first error reported.
-                if (pendingFailure != null) {
-                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-                    pendingFailure = cause;
-                }
-
-                if (!handshakeFuture.isDone()) {
-                    handshakeFuture.setFailure(cause);
-                }
-            }
+            super.channelActive(context);
         }
 
         @Override
@@ -437,8 +131,9 @@ public class NettyWSTransport implements NettyTransport {
             Channel ch = ctx.channel();
             if (!handshaker.isHandshakeComplete()) {
                 handshaker.finishHandshake(ch, (FullHttpResponse) message);
-                LOG.info("WebSocket Client connected! {}", ctx.channel());
-                handshakeFuture.setSuccess();
+                LOG.trace("WebSocket Client connected! {}", ctx.channel());
+                // Now trigger super processing as we are really connected.
+                NettyWSTransport.super.handleConnected(ch);
                 return;
             }
 
@@ -447,7 +142,7 @@ public class NettyWSTransport implements NettyTransport {
                 FullHttpResponse response = (FullHttpResponse) message;
                 throw new IllegalStateException(
                     "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
-                    ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+                    ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
             }
 
             WebSocketFrame frame = (WebSocketFrame) message;
@@ -457,10 +152,11 @@ public class NettyWSTransport implements NettyTransport {
                 ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
             } else if (frame instanceof BinaryWebSocketFrame) {
                 BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
-                LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+                LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
                 listener.onData(binaryFrame.content());
-            } else if (frame instanceof PongWebSocketFrame) {
-                LOG.trace("WebSocket Client received pong");
+            } else if (frame instanceof PingWebSocketFrame) {
+                LOG.trace("WebSocket Client received ping, response with pong");
+                ch.write(new PongWebSocketFrame(frame.content()));
             } else if (frame instanceof CloseWebSocketFrame) {
                 LOG.trace("WebSocket Client received closing");
                 ch.close();