You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/30 19:14:23 UTC

[3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6339

https://issues.apache.org/jira/browse/AMQ-6339

Add support for AMQP client to connect using WebSockets.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31c55f75
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31c55f75
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31c55f75

Branch: refs/heads/master
Commit: 31c55f75108b06020eb6206c52361b04f49656a9
Parents: 83827f2
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 17 16:26:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 30 14:52:40 2016 -0400

----------------------------------------------------------------------
 activemq-amqp/pom.xml                           |  45 ++
 .../transport/amqp/AmqpProtocolConverter.java   |   8 +-
 .../transport/amqp/AmqpWSTransport.java         | 143 ++++++
 .../transport/amqp/AmqpWSTransportFactory.java  |  88 ++++
 .../transport/amqp/protocol/AmqpConnection.java |   7 +-
 .../transport/amqp/sasl/AmqpAuthenticator.java  |   4 +-
 .../org/apache/activemq/transport/amqp+ws       |  17 +
 .../org/apache/activemq/transport/amqp+wss      |  17 +
 .../transport/amqp/AmqpAndMqttTest.java         | 137 ++++++
 .../transport/amqp/AmqpTestSupport.java         |  49 ++
 .../transport/amqp/JMSClientSslTest.java        |  13 -
 .../activemq/transport/amqp/JMSClientTest.java  |   6 +-
 .../transport/amqp/client/AmqpClient.java       |   2 +-
 .../amqp/client/AmqpClientTestSupport.java      |  36 +-
 .../transport/amqp/client/AmqpConnection.java   |  10 +-
 .../client/transport/NettyTcpTransport.java     | 401 ++++++++++++++++
 .../amqp/client/transport/NettyTransport.java   | 370 +--------------
 .../client/transport/NettyTransportFactory.java |  17 +-
 .../transport/NettyTransportSslOptions.java     |   2 +-
 .../amqp/client/transport/NettyWSTransport.java | 470 +++++++++++++++++++
 .../AmqpBrokerReuqestedHearbeatsTest.java       |  18 +
 .../AmqpClientRequestsHeartbeatsTest.java       |  18 +
 .../amqp/interop/AmqpConnectionsTest.java       |  20 +
 .../src/test/resources/log4j.properties         |   2 +-
 .../activemq/transport/vm/VMTransport.java      |  17 +
 .../apache/activemq/transport/Transport.java    |  25 +
 .../activemq/transport/TransportFilter.java     |  42 +-
 .../transport/failover/FailoverTransport.java   |  28 +-
 .../transport/fanout/FanoutTransport.java       |  71 +--
 .../activemq/transport/mock/MockTransport.java  |  70 ++-
 .../activemq/transport/tcp/SslTransport.java    |  11 +-
 .../activemq/transport/tcp/TcpTransport.java    |  19 +-
 .../activemq/transport/udp/UdpTransport.java    |  25 +-
 .../activemq/transport/ws/WSTransport.java      |  95 ++++
 activemq-http/pom.xml                           |   2 +-
 .../transport/http/BlockingQueueTransport.java  |  27 +-
 .../transport/http/HttpClientTransport.java     |  15 +
 .../transport/util/HttpTransportUtils.java      |  21 +
 .../transport/ws/AbstractStompSocket.java       |   8 +-
 .../transport/ws/StompWSConnection.java         |  11 +-
 .../transport/ws/WSTransportFactory.java        |  12 +-
 .../activemq/transport/ws/WSTransportProxy.java | 270 +++++++++++
 .../transport/ws/WSTransportServer.java         |  19 +-
 .../activemq/transport/ws/jetty9/WSServlet.java | 127 ++++-
 .../transport/wss/WSSTransportFactory.java      |  12 +-
 .../activemq/transport/ws/MQTTWSConnection.java |  10 +-
 .../ws/StompWSConnectionTimeoutTest.java        |   4 -
 .../transport/ws/WSTransportTestSupport.java    |  16 +-
 .../src/test/resources/log4j.properties         |   4 +-
 activemq-unit-tests/pom.xml                     |   4 -
 .../activemq/conversions/AmqpAndMqttTest.java   | 136 ------
 .../activemq/transport/StubTransport.java       |  25 +-
 .../auto/AutoTransportConfigureTest.java        |  30 +-
 pom.xml                                         |   2 +-
 54 files changed, 2362 insertions(+), 696 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 7c8320f..dba2dac 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -94,6 +94,21 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-http</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-mqtt</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.eclipse.jetty.websocket</groupId>
+      <artifactId>websocket-server</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.springframework</groupId>
       <artifactId>spring-context</artifactId>
       <scope>test</scope>
@@ -123,6 +138,36 @@
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-buffer</artifactId>
+      <version>${netty-all-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+      <version>${netty-all-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-handler</artifactId>
+      <version>${netty-all-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http</artifactId>
+      <version>${netty-all-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-transport</artifactId>
+      <version>${netty-all-version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 6b9a178..9234bf5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -22,13 +22,13 @@ import org.apache.activemq.command.Command;
 
 /**
  * Interface that defines the API for any AMQP protocol converter ised to
- * map AMQP mechanincs to ActiveMQ and back.
+ * map AMQP mechanics to ActiveMQ and back.
  */
 public interface AmqpProtocolConverter {
 
     /**
      * A new incoming data packet from the remote peer is handed off to the
-     * protocol converter for porcessing.  The type can vary and be either an
+     * protocol converter for processing.  The type can vary and be either an
      * AmqpHeader at the handshake phase or a byte buffer containing the next
      * incoming frame data from the remote.
      *
@@ -70,9 +70,9 @@ public interface AmqpProtocolConverter {
      * empty frames or closing connections due to remote end being inactive
      * for to long.
      *
-     * @returns the amount of milliseconds to wait before performaing another check.
+     * @returns the amount of milliseconds to wait before performing another check.
      *
-     * @throws IOException if an error occurs on writing heatbeats to the wire.
+     * @throws IOException if an error occurs on writing heart-beats to the wire.
      */
     long keepAlive() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
new file mode 100644
index 0000000..2ec3a09
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.amqp.AmqpFrameParser.AMQPFrameSink;
+import org.apache.activemq.transport.ws.WSTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * An AMQP based WebSocket transport implementation.
+ */
+public class AmqpWSTransport extends TransportSupport implements WSTransport, AMQPFrameSink {
+
+    private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
+    private final URI remoteLocation;
+
+    private WSTransportSink outputSink;
+    private int receiveCounter;
+    private X509Certificate[] certificates;
+
+    /**
+     * Create a new Transport instance.
+     *
+     * @param location
+     *      the remote location where the client connection is from.
+     * @param wireFormat
+     *      the WireFormat instance that configures this Transport.
+     */
+    public AmqpWSTransport(URI location, WireFormat wireFormat) {
+        super();
+
+        remoteLocation = location;
+        frameReader.setWireFormat((AmqpWireFormat) wireFormat);
+    }
+
+    @Override
+    public void setTransportSink(WSTransportSink outputSink) {
+        this.outputSink = outputSink;
+    }
+
+    @Override
+    public void oneway(Object command) throws IOException {
+        if (command instanceof ByteBuffer) {
+            outputSink.onSocketOutboundBinary((ByteBuffer) command);
+        } else {
+            throw new IOException("Unexpected output command.");
+        }
+    }
+
+    @Override
+    public String getRemoteAddress() {
+        return remoteLocation.toASCIIString();
+    }
+
+    @Override
+    public int getReceiveCounter() {
+        return receiveCounter;
+    }
+
+    @Override
+    public X509Certificate[] getPeerCertificates() {
+        return certificates;
+    }
+
+    @Override
+    public void setPeerCertificates(X509Certificate[] certificates) {
+        this.certificates = certificates;
+    }
+
+    @Override
+    public String getSubProtocol() {
+        return "amqp";
+    }
+
+    @Override
+    public WireFormat getWireFormat() {
+        return frameReader.getWireFormat();
+    }
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        // Currently nothing needed here since we have no async workers.
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        if (outputSink == null) {
+            throw new IllegalStateException("Transport started before output sink assigned.");
+        }
+
+        // Currently nothing needed here since we have no async workers.
+    }
+
+    //----- WebSocket event hooks --------------------------------------------//
+
+    @Override
+    public void onWebSocketText(String data) throws IOException {
+        onException(new IOException("Illegal text content receive on AMQP WebSocket channel."));
+    }
+
+    @Override
+    public void onWebSocketBinary(ByteBuffer data) throws IOException {
+        try {
+            frameReader.parse(data);
+        } catch (Exception e) {
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    @Override
+    public void onWebSocketClosed() throws IOException {
+        onException(new IOException("Unexpected close of AMQP WebSocket channel."));
+    }
+
+    //----- AMQP Frame Data event hook ---------------------------------------//
+
+    @Override
+    public void onFrame(Object frame) {
+        doConsume(frame);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
new file mode 100644
index 0000000..b85a827
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Factory for creating WebSocket aware AMQP Transports.
+ */
+public class AmqpWSTransportFactory extends TransportFactory implements BrokerServiceAware {
+
+    private BrokerService brokerService = null;
+
+    @Override
+    protected String getDefaultWireFormatType() {
+        return "amqp";
+    }
+
+    @Override
+    public TransportServer doBind(URI location) throws IOException {
+        throw new IOException("doBind() method not implemented! No Server over WS implemented.");
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+        AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
+
+        Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
+
+        IntrospectionSupport.setProperties(amqpTransport, options);
+        IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
+
+        // Now wrap the filter with the monitor
+        transport = createInactivityMonitor(amqpTransport, format);
+        IntrospectionSupport.setProperties(transport, options);
+
+        return super.compositeConfigure(transport, format, options);
+    }
+
+    /**
+     * Factory method to create a new transport
+     *
+     * @throws IOException
+     * @throws UnknownHostException
+     */
+    @Override
+    protected Transport createTransport(URI location, WireFormat wireFormat) throws MalformedURLException, UnknownHostException, IOException {
+        return new AmqpWSTransport(location, wireFormat);
+    }
+
+    @Override
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerService = brokerService;
+    }
+
+    protected Transport createInactivityMonitor(AmqpTransportFilter transport, WireFormat format) {
+        AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+        transport.setInactivityMonitor(monitor);
+        return monitor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 8b86132..aa9b577 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -309,7 +309,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
             while (!done) {
                 ByteBuffer toWrite = protonTransport.getOutputBuffer();
                 if (toWrite != null && toWrite.hasRemaining()) {
-                    LOG.trace("Sending {} bytes out", toWrite.limit());
+                    LOG.trace("Server: Sending {} bytes out", toWrite.limit());
                     amqpTransport.sendToAmqp(toWrite);
                     protonTransport.outputConsumed();
                 } else {
@@ -356,6 +356,8 @@ public class AmqpConnection implements AmqpProtocolConverter {
             return;
         }
 
+        LOG.trace("Server: Received from client: {} bytes", frame.getLength());
+
         while (frame.length > 0) {
             try {
                 int count = protonTransport.input(frame.data, frame.offset, frame.length);
@@ -386,7 +388,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
             Event event = null;
             while ((event = eventCollector.peek()) != null) {
                 if (amqpTransport.isTrace()) {
-                    LOG.trace("Processing event: {}", event.getType());
+                    LOG.trace("Server: Processing event: {}", event.getType());
                 }
                 switch (event.getType()) {
                     case CONNECTION_REMOTE_OPEN:
@@ -484,7 +486,6 @@ public class AmqpConnection implements AmqpProtocolConverter {
 
                         protonConnection.close();
                     } else {
-
                         if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
                             LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
                             protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
index 379c5f9..2309374 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
@@ -54,14 +54,14 @@ public class AmqpAuthenticator {
     }
 
     /**
-     * @return true if the SASL exchange has conpleted, regardless of success.
+     * @return true if the SASL exchange has completed, regardless of success.
      */
     public boolean isDone() {
         return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
     }
 
     /**
-     * @return the list of all SASL mechanisms that are supported curretnly.
+     * @return the list of all SASL mechanisms that are supported currently.
      */
     public String[] getSupportedMechanisms() {
         return mechanisms;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
new file mode 100644
index 0000000..cfadf68
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
new file mode 100644
index 0000000..cfadf68
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
new file mode 100644
index 0000000..ac7a199
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AmqpAndMqttTest {
+
+    protected BrokerService broker;
+    private TransportConnector amqpConnector;
+    private TransportConnector mqttConnector;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setAdvisorySupport(false);
+        broker.setSchedulerSupport(false);
+
+        amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
+        mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
+
+        return broker;
+    }
+
+    @Test(timeout = 60000)
+    public void testFromMqttToAmqp() throws Exception {
+        Connection amqp = createAmqpConnection();
+        Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
+
+        final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
+        mqtt.connect();
+        byte[] payload = bytes("Hello World");
+        mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
+        mqtt.disconnect();
+
+        Message msg = consumer.receive(1000 * 5);
+        assertNotNull(msg);
+        assertTrue(msg instanceof BytesMessage);
+
+        BytesMessage bmsg = (BytesMessage) msg;
+        byte[] actual = new byte[(int) bmsg.getBodyLength()];
+        bmsg.readBytes(actual);
+        assertTrue(Arrays.equals(actual, payload));
+        amqp.close();
+    }
+
+    private byte[] bytes(String value) {
+        try {
+            return value.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected MQTT createMQTTConnection() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
+        return mqtt;
+    }
+
+    public Connection createAmqpConnection() throws Exception {
+
+        String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort();
+
+        final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
+
+        factory.setUsername("admin");
+        factory.setPassword("password");
+
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+
+        connection.start();
+        return connection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index af6a63f..12c0a17 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.ServerSocket;
 import java.net.URI;
 import java.security.SecureRandom;
 import java.util.Set;
@@ -33,6 +34,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.net.ServerSocketFactory;
 import javax.net.ssl.KeyManager;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.TrustManager;
@@ -79,6 +81,11 @@ public class AmqpTestSupport {
     protected URI amqpNioPlusSslURI;
     protected int amqpNioPlusSslPort;
 
+    protected URI amqpWsURI;
+    protected int amqpWsPort;
+    protected URI amqpWssURI;
+    protected int amqpWssPort;
+
     protected URI autoURI;
     protected int autoPort;
     protected URI autoSslURI;
@@ -213,6 +220,20 @@ public class AmqpTestSupport {
             autoNioPlusSslURI = connector.getPublishableConnectURI();
             LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort);
         }
+        if (isUseWsConnector()) {
+            connector = brokerService.addConnector(
+                "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
+            amqpWsPort = connector.getConnectUri().getPort();
+            amqpWsURI = connector.getPublishableConnectURI();
+            LOG.debug("Using amqp+ws port " + amqpWsPort);
+        }
+        if (isUseWssConnector()) {
+            connector = brokerService.addConnector(
+                "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
+            amqpWssPort = connector.getConnectUri().getPort();
+            amqpWssURI = connector.getPublishableConnectURI();
+            LOG.debug("Using amqp+wss port " + amqpWssPort);
+        }
     }
 
     protected boolean isPersistent() {
@@ -263,6 +284,14 @@ public class AmqpTestSupport {
         return false;
     }
 
+    protected boolean isUseWsConnector() {
+        return false;
+    }
+
+    protected boolean isUseWssConnector() {
+        return false;
+    }
+
     protected String getAmqpTransformer() {
         return "jms";
     }
@@ -355,6 +384,26 @@ public class AmqpTestSupport {
         return name.getMethodName();
     }
 
+    protected int getProxyPort(int proxyPort) {
+        if (proxyPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                proxyPort = ss.getLocalPort();
+            } catch (IOException e) { // ignore
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+
+        return proxyPort;
+    }
+
     protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
         ObjectName brokerViewMBean = new ObjectName(
             "org.apache.activemq:type=Broker,brokerName=localhost");

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
index a8acb7d..b0bd3a7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
@@ -17,13 +17,7 @@
 package org.apache.activemq.transport.amqp;
 
 import java.net.URI;
-import java.security.SecureRandom;
 
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.junit.BeforeClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,13 +27,6 @@ import org.slf4j.LoggerFactory;
 public class JMSClientSslTest extends JMSClientTest {
     protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class);
 
-    @BeforeClass
-    public static void beforeClass() throws Exception {
-        SSLContext ctx = SSLContext.getInstance("TLS");
-        ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
-        SSLContext.setDefault(ctx);
-    }
-
     @Override
     protected URI getBrokerURI() {
         return amqpSslURI;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 647ff13..87dc9ee 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -57,7 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
 import org.apache.activemq.broker.jmx.QueueViewMBean;
 import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
 import org.apache.activemq.util.Wait;
-import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
 import org.junit.Test;
 import org.objectweb.jtests.jms.framework.TestConfig;
 import org.slf4j.Logger;
@@ -1180,8 +1180,8 @@ public class JMSClientTest extends JMSClientTestSupport {
 
     @Test(timeout = 60000)
     public void testZeroPrefetchWithTwoConsumers() throws Exception {
-        connection = createConnection();
-        ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+        JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
+        connection = cf.createConnection();
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 738b0eb..78b1aa0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -21,8 +21,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
 import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
index 5504954..123f86c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -51,12 +51,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
 
     @Override
     protected boolean isUseTcpConnector() {
-        return !isUseSSL() && !connectorScheme.contains("nio");
+        return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
     }
 
     @Override
     protected boolean isUseSslConnector() {
-        return isUseSSL() && !connectorScheme.contains("nio");
+        return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
     }
 
     @Override
@@ -69,13 +69,33 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
         return isUseSSL() && connectorScheme.contains("nio");
     }
 
+    @Override
+    protected boolean isUseWsConnector() {
+        return !isUseSSL() && connectorScheme.contains("ws");
+    }
+
+    @Override
+    protected boolean isUseWssConnector() {
+        return isUseSSL() && connectorScheme.contains("wss");
+    }
+
     public URI getBrokerAmqpConnectionURI() {
+        boolean webSocket = false;
+
         try {
             int port = 0;
             switch (connectorScheme) {
                 case "amqp":
                     port = this.amqpPort;
                     break;
+                case "amqp+ws":
+                    port = this.amqpWsPort;
+                    webSocket = true;
+                    break;
+                case "amqp+wss":
+                    port = this.amqpWssPort;
+                    webSocket = true;
+                    break;
                 case "amqp+ssl":
                     port = this.amqpSslPort;
                     break;
@@ -92,9 +112,17 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
             String uri = null;
 
             if (isUseSSL()) {
-                uri = "ssl://127.0.0.1:" + port;
+                if (webSocket) {
+                    uri = "wss://127.0.0.1:" + port;
+                } else {
+                    uri = "ssl://127.0.0.1:" + port;
+                }
             } else {
-                uri = "tcp://127.0.0.1:" + port;
+                if (webSocket) {
+                    uri = "ws://127.0.0.1:" + port;
+                } else {
+                    uri = "tcp://127.0.0.1:" + port;
+                }
             }
 
             if (!getAmqpConnectionURIOptions().isEmpty()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index acda553..85a1d22 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.transport.InactivityIOException;
 import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
 import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
 import org.apache.activemq.transport.amqp.client.util.AsyncResult;
 import org.apache.activemq.transport.amqp.client.util.ClientFuture;
@@ -79,7 +78,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private final AtomicLong sessionIdGenerator = new AtomicLong();
     private final AtomicLong txIdGenerator = new AtomicLong();
     private final Collector protonCollector = new CollectorImpl();
-    private final NettyTransport transport;
+    private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
     private final Transport protonTransport = Transport.Factory.create();
 
     private final String username;
@@ -103,7 +102,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
     private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
 
-    public AmqpConnection(NettyTransport transport, String username, String password) {
+    public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
         setEndpoint(Connection.Factory.create());
         getEndpoint().collect(protonCollector);
 
@@ -490,7 +489,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
             @Override
             public void run() {
                 ByteBuffer source = incoming.nioBuffer();
-                LOG.trace("Received from Broker {} bytes:", source.remaining());
+                LOG.trace("Client Received from Broker {} bytes:", source.remaining());
 
                 if (protonTransport.isClosed()) {
                     LOG.debug("Ignoring incoming data because transport is closed");
@@ -520,6 +519,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
     @Override
     public void onTransportClosed() {
         LOG.debug("The transport has unexpectedly closed");
+        failed(getOpenAbortException());
     }
 
     @Override
@@ -612,7 +612,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
             Event protonEvent = null;
             while ((protonEvent = protonCollector.peek()) != null) {
                 if (!protonEvent.getType().equals(Type.TRANSPORT)) {
-                    LOG.trace("New Proton Event: {}", protonEvent.getType());
+                    LOG.trace("Client: New Proton Event: {}", protonEvent.getType());
                 }
 
                 AmqpEventSink amqpEventSink = null;

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
new file mode 100644
index 0000000..886ed4b
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+    private static final int QUIET_PERIOD = 20;
+    private static final int SHUTDOWN_TIMEOUT = 100;
+
+    protected Bootstrap bootstrap;
+    protected EventLoopGroup group;
+    protected Channel channel;
+    protected NettyTransportListener listener;
+    protected NettyTransportOptions options;
+    protected final URI remote;
+    protected boolean secure;
+
+    private final AtomicBoolean connected = new AtomicBoolean();
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final CountDownLatch connectLatch = new CountDownLatch(1);
+    private IOException failureCause;
+    private Throwable pendingFailure;
+
+    /**
+     * Create a new transport instance
+     *
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+        this(null, remoteLocation, options);
+    }
+
+    /**
+     * Create a new transport instance
+     *
+     * @param listener
+     *        the TransportListener that will receive events from this Transport.
+     * @param remoteLocation
+     *        the URI that defines the remote resource to connect to.
+     * @param options
+     *        the transport options used to configure the socket connection.
+     */
+    public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+        this.options = options;
+        this.listener = listener;
+        this.remote = remoteLocation;
+        this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
+    }
+
+    @Override
+    public void connect() throws IOException {
+
+        if (listener == null) {
+            throw new IllegalStateException("A transport listener must be set before connection attempts.");
+        }
+
+        group = new NioEventLoopGroup(1);
+
+        bootstrap = new Bootstrap();
+        bootstrap.group(group);
+        bootstrap.channel(NioSocketChannel.class);
+        bootstrap.handler(new ChannelInitializer<Channel>() {
+
+            @Override
+            public void initChannel(Channel connectedChannel) throws Exception {
+                configureChannel(connectedChannel);
+            }
+        });
+
+        configureNetty(bootstrap, getTransportOptions());
+
+        ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+        future.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+                if (future.isSuccess()) {
+                    handleConnected(future.channel());
+                } else if (future.isCancelled()) {
+                    connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+                } else {
+                    connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+                }
+            }
+        });
+
+        try {
+            connectLatch.await();
+        } catch (InterruptedException ex) {
+            LOG.debug("Transport connection was interrupted.");
+            Thread.interrupted();
+            failureCause = IOExceptionSupport.create(ex);
+        }
+
+        if (failureCause != null) {
+            // Close out any Netty resources now as they are no longer needed.
+            if (channel != null) {
+                channel.close().syncUninterruptibly();
+                channel = null;
+            }
+            if (group != null) {
+                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+                group = null;
+            }
+
+            throw failureCause;
+        } else {
+            // Connected, allow any held async error to fire now and close the transport.
+            channel.eventLoop().execute(new Runnable() {
+
+                @Override
+                public void run() {
+                    if (pendingFailure != null) {
+                        channel.pipeline().fireExceptionCaught(pendingFailure);
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return connected.get();
+    }
+
+    @Override
+    public boolean isSSL() {
+        return secure;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (closed.compareAndSet(false, true)) {
+            connected.set(false);
+            if (channel != null) {
+                channel.close().syncUninterruptibly();
+            }
+            if (group != null) {
+                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    @Override
+    public ByteBuf allocateSendBuffer(int size) throws IOException {
+        checkConnected();
+        return channel.alloc().ioBuffer(size, size);
+    }
+
+    @Override
+    public void send(ByteBuf output) throws IOException {
+        checkConnected();
+        int length = output.readableBytes();
+        if (length == 0) {
+            return;
+        }
+
+        LOG.trace("Attempted write of: {} bytes", length);
+
+        channel.writeAndFlush(output);
+    }
+
+    @Override
+    public NettyTransportListener getTransportListener() {
+        return listener;
+    }
+
+    @Override
+    public void setTransportListener(NettyTransportListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public NettyTransportOptions getTransportOptions() {
+        if (options == null) {
+            if (isSSL()) {
+                options = NettyTransportSslOptions.INSTANCE;
+            } else {
+                options = NettyTransportOptions.INSTANCE;
+            }
+        }
+
+        return options;
+    }
+
+    @Override
+    public URI getRemoteLocation() {
+        return remote;
+    }
+
+    @Override
+    public Principal getLocalPrincipal() {
+        if (!isSSL()) {
+            throw new UnsupportedOperationException("Not connected to a secure channel");
+        }
+
+        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+        return sslHandler.engine().getSession().getLocalPrincipal();
+    }
+
+    //----- Internal implementation details, can be overridden as needed --//
+
+    protected String getRemoteHost() {
+        return remote.getHost();
+    }
+
+    protected int getRemotePort() {
+        int port = remote.getPort();
+
+        if (port <= 0) {
+            if (isSSL()) {
+                port = getSslOptions().getDefaultSslPort();
+            } else {
+                port = getTransportOptions().getDefaultTcpPort();
+            }
+        }
+
+        return port;
+    }
+
+    protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+        if (options.getSendBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+        }
+
+        if (options.getReceiveBufferSize() != -1) {
+            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+        }
+
+        if (options.getTrafficClass() != -1) {
+            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+        }
+    }
+
+    protected void configureChannel(final Channel channel) throws Exception {
+        if (isSSL()) {
+            SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                    if (future.isSuccess()) {
+                        LOG.trace("SSL Handshake has completed: {}", channel);
+                        connectionEstablished(channel);
+                    } else {
+                        LOG.trace("SSL Handshake has failed: {}", channel);
+                        connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+                    }
+                }
+            });
+
+            channel.pipeline().addLast(sslHandler);
+        }
+
+        channel.pipeline().addLast(new NettyTcpTransportHandler());
+    }
+
+    protected void handleConnected(final Channel channel) throws Exception {
+        if (!isSSL()) {
+            connectionEstablished(channel);
+        }
+    }
+
+    //----- State change handlers and checks ---------------------------------//
+
+    /**
+     * Called when the transport has successfully connected and is ready for use.
+     */
+    protected void connectionEstablished(Channel connectedChannel) {
+        channel = connectedChannel;
+        connected.set(true);
+        connectLatch.countDown();
+    }
+
+    /**
+     * Called when the transport connection failed and an error should be returned.
+     *
+     * @param failedChannel
+     *      The Channel instance that failed.
+     * @param cause
+     *      An IOException that describes the cause of the failed connection.
+     */
+    protected void connectionFailed(Channel failedChannel, IOException cause) {
+        failureCause = IOExceptionSupport.create(cause);
+        channel = failedChannel;
+        connected.set(false);
+        connectLatch.countDown();
+    }
+
+    private NettyTransportSslOptions getSslOptions() {
+        return (NettyTransportSslOptions) getTransportOptions();
+    }
+
+    private void checkConnected() throws IOException {
+        if (!connected.get()) {
+            throw new IOException("Cannot send to a non-connected transport.");
+        }
+    }
+
+    //----- Handle connection events -----------------------------------------//
+
+    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+        @Override
+        public void channelActive(ChannelHandlerContext context) throws Exception {
+            LOG.trace("Channel has become active! Channel is {}", context.channel());
+        }
+
+        @Override
+        public void channelInactive(ChannelHandlerContext context) throws Exception {
+            LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+            if (connected.compareAndSet(true, false) && !closed.get()) {
+                LOG.trace("Firing onTransportClosed listener");
+                listener.onTransportClosed();
+            }
+        }
+
+        @Override
+        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+            LOG.trace("Exception on channel! Channel is {}", context.channel());
+            if (connected.compareAndSet(true, false) && !closed.get()) {
+                LOG.trace("Firing onTransportError listener");
+                if (pendingFailure != null) {
+                    listener.onTransportError(pendingFailure);
+                } else {
+                    listener.onTransportError(cause);
+                }
+            } else {
+                // Hold the first failure for later dispatch if connect succeeds.
+                // This will then trigger disconnect using the first error reported.
+                if (pendingFailure != null) {
+                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+                    pendingFailure = cause;
+                }
+            }
+        }
+
+        @Override
+        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+            LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+            listener.onData(buffer);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
index 4084780..48be3a2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -16,375 +16,37 @@
  */
 package org.apache.activemq.transport.amqp.client.transport;
 
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 import java.io.IOException;
 import java.net.URI;
 import java.security.Principal;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
 
 /**
- * TCP based transport that uses Netty as the underlying IO layer.
+ *
  */
-public class NettyTransport {
-
-    private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);
-
-    private static final int QUIET_PERIOD = 20;
-    private static final int SHUTDOWN_TIMEOUT = 100;
-
-    protected Bootstrap bootstrap;
-    protected EventLoopGroup group;
-    protected Channel channel;
-    protected NettyTransportListener listener;
-    protected NettyTransportOptions options;
-    protected final URI remote;
-    protected boolean secure;
-
-    private final AtomicBoolean connected = new AtomicBoolean();
-    private final AtomicBoolean closed = new AtomicBoolean();
-    private final CountDownLatch connectLatch = new CountDownLatch(1);
-    private IOException failureCause;
-    private Throwable pendingFailure;
-
-    /**
-     * Create a new transport instance
-     *
-     * @param remoteLocation
-     *        the URI that defines the remote resource to connect to.
-     * @param options
-     *        the transport options used to configure the socket connection.
-     */
-    public NettyTransport(URI remoteLocation, NettyTransportOptions options) {
-        this(null, remoteLocation, options);
-    }
-
-    /**
-     * Create a new transport instance
-     *
-     * @param listener
-     *        the TransportListener that will receive events from this Transport.
-     * @param remoteLocation
-     *        the URI that defines the remote resource to connect to.
-     * @param options
-     *        the transport options used to configure the socket connection.
-     */
-    public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
-        this.options = options;
-        this.listener = listener;
-        this.remote = remoteLocation;
-        this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
-    }
-
-    public void connect() throws IOException {
-
-        if (listener == null) {
-            throw new IllegalStateException("A transport listener must be set before connection attempts.");
-        }
-
-        group = new NioEventLoopGroup(1);
-
-        bootstrap = new Bootstrap();
-        bootstrap.group(group);
-        bootstrap.channel(NioSocketChannel.class);
-        bootstrap.handler(new ChannelInitializer<Channel>() {
-
-            @Override
-            public void initChannel(Channel connectedChannel) throws Exception {
-                configureChannel(connectedChannel);
-            }
-        });
-
-        configureNetty(bootstrap, getTransportOptions());
-
-        ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
-        future.addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-                if (future.isSuccess()) {
-                    handleConnected(future.channel());
-                } else if (future.isCancelled()) {
-                    connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
-                } else {
-                    connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
-                }
-            }
-        });
-
-        try {
-            connectLatch.await();
-        } catch (InterruptedException ex) {
-            LOG.debug("Transport connection was interrupted.");
-            Thread.interrupted();
-            failureCause = IOExceptionSupport.create(ex);
-        }
-
-        if (failureCause != null) {
-            // Close out any Netty resources now as they are no longer needed.
-            if (channel != null) {
-                channel.close().syncUninterruptibly();
-                channel = null;
-            }
-            if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-                group = null;
-            }
-
-            throw failureCause;
-        } else {
-            // Connected, allow any held async error to fire now and close the transport.
-            channel.eventLoop().execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    if (pendingFailure != null) {
-                        channel.pipeline().fireExceptionCaught(pendingFailure);
-                    }
-                }
-            });
-        }
-    }
-
-    public boolean isConnected() {
-        return connected.get();
-    }
-
-    public boolean isSSL() {
-        return secure;
-    }
-
-    public void close() throws IOException {
-        if (closed.compareAndSet(false, true)) {
-            connected.set(false);
-            if (channel != null) {
-                channel.close().syncUninterruptibly();
-            }
-            if (group != null) {
-                group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
-            }
-        }
-    }
-
-    public ByteBuf allocateSendBuffer(int size) throws IOException {
-        checkConnected();
-        return channel.alloc().ioBuffer(size, size);
-    }
-
-    public void send(ByteBuf output) throws IOException {
-        checkConnected();
-        int length = output.readableBytes();
-        if (length == 0) {
-            return;
-        }
-
-        LOG.trace("Attempted write of: {} bytes", length);
-
-        channel.writeAndFlush(output);
-    }
-
-    public NettyTransportListener getTransportListener() {
-        return listener;
-    }
-
-    public void setTransportListener(NettyTransportListener listener) {
-        this.listener = listener;
-    }
-
-    public NettyTransportOptions getTransportOptions() {
-        if (options == null) {
-            if (isSSL()) {
-                options = NettyTransportSslOptions.INSTANCE;
-            } else {
-                options = NettyTransportOptions.INSTANCE;
-            }
-        }
-
-        return options;
-    }
-
-    public URI getRemoteLocation() {
-        return remote;
-    }
-
-    public Principal getLocalPrincipal() {
-        if (!isSSL()) {
-            throw new UnsupportedOperationException("Not connected to a secure channel");
-        }
-
-        SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
-
-        return sslHandler.engine().getSession().getLocalPrincipal();
-    }
-
-    //----- Internal implementation details, can be overridden as needed --//
-
-    protected String getRemoteHost() {
-        return remote.getHost();
-    }
-
-    protected int getRemotePort() {
-        int port = remote.getPort();
-
-        if (port <= 0) {
-            if (isSSL()) {
-                port = getSslOptions().getDefaultSslPort();
-            } else {
-                port = getTransportOptions().getDefaultTcpPort();
-            }
-        }
-
-        return port;
-    }
-
-    protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
-        bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
-        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
-        bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
-        bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-        bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
-
-        if (options.getSendBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
-        }
-
-        if (options.getReceiveBufferSize() != -1) {
-            bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
-            bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
-        }
-
-        if (options.getTrafficClass() != -1) {
-            bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
-        }
-    }
-
-    protected void configureChannel(final Channel channel) throws Exception {
-        if (isSSL()) {
-            SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
-            sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
-                @Override
-                public void operationComplete(Future<Channel> future) throws Exception {
-                    if (future.isSuccess()) {
-                        LOG.trace("SSL Handshake has completed: {}", channel);
-                        connectionEstablished(channel);
-                    } else {
-                        LOG.trace("SSL Handshake has failed: {}", channel);
-                        connectionFailed(channel, IOExceptionSupport.create(future.cause()));
-                    }
-                }
-            });
-
-            channel.pipeline().addLast(sslHandler);
-        }
-
-        channel.pipeline().addLast(new NettyTcpTransportHandler());
-    }
+public interface NettyTransport {
 
-    protected void handleConnected(final Channel channel) throws Exception {
-        if (!isSSL()) {
-            connectionEstablished(channel);
-        }
-    }
+    void connect() throws IOException;
 
-    //----- State change handlers and checks ---------------------------------//
+    boolean isConnected();
 
-    /**
-     * Called when the transport has successfully connected and is ready for use.
-     */
-    protected void connectionEstablished(Channel connectedChannel) {
-        channel = connectedChannel;
-        connected.set(true);
-        connectLatch.countDown();
-    }
+    boolean isSSL();
 
-    /**
-     * Called when the transport connection failed and an error should be returned.
-     *
-     * @param failedChannel
-     *      The Channel instance that failed.
-     * @param cause
-     *      An IOException that describes the cause of the failed connection.
-     */
-    protected void connectionFailed(Channel failedChannel, IOException cause) {
-        failureCause = IOExceptionSupport.create(cause);
-        channel = failedChannel;
-        connected.set(false);
-        connectLatch.countDown();
-    }
+    void close() throws IOException;
 
-    private NettyTransportSslOptions getSslOptions() {
-        return (NettyTransportSslOptions) getTransportOptions();
-    }
+    ByteBuf allocateSendBuffer(int size) throws IOException;
 
-    private void checkConnected() throws IOException {
-        if (!connected.get()) {
-            throw new IOException("Cannot send to a non-connected transport.");
-        }
-    }
+    void send(ByteBuf output) throws IOException;
 
-    //----- Handle connection events -----------------------------------------//
+    NettyTransportListener getTransportListener();
 
-    private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+    void setTransportListener(NettyTransportListener listener);
 
-        @Override
-        public void channelActive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has become active! Channel is {}", context.channel());
-        }
+    NettyTransportOptions getTransportOptions();
 
-        @Override
-        public void channelInactive(ChannelHandlerContext context) throws Exception {
-            LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportClosed listener");
-                listener.onTransportClosed();
-            }
-        }
+    URI getRemoteLocation();
 
-        @Override
-        public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
-            LOG.trace("Exception on channel! Channel is {}", context.channel());
-            if (connected.compareAndSet(true, false) && !closed.get()) {
-                LOG.trace("Firing onTransportError listener");
-                if (pendingFailure != null) {
-                    listener.onTransportError(pendingFailure);
-                } else {
-                    listener.onTransportError(cause);
-                }
-            } else {
-                // Hold the first failure for later dispatch if connect succeeds.
-                // This will then trigger disconnect using the first error reported.
-                if (pendingFailure != null) {
-                    LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
-                    pendingFailure = cause;
-                }
-            }
-        }
+    Principal getLocalPrincipal();
 
-        @Override
-        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
-            LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
-            listener.onData(buffer);
-        }
-    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
index fd50890..cc47aa2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -46,7 +46,7 @@ public final class NettyTransportFactory {
 
         remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
 
-        if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) {
+        if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
             transportOptions = NettyTransportOptions.INSTANCE.clone();
         } else {
             transportOptions = NettyTransportSslOptions.INSTANCE.clone();
@@ -61,7 +61,20 @@ public final class NettyTransportFactory {
             throw new IllegalArgumentException(msg);
         }
 
-        NettyTransport result = new NettyTransport(remoteURI, transportOptions);
+        NettyTransport result = null;
+
+        switch (remoteURI.getScheme().toLowerCase()) {
+            case "tcp":
+            case "ssl":
+                result = new NettyTcpTransport(remoteURI, transportOptions);
+                break;
+            case "ws":
+            case "wss":
+                result = new NettyWSTransport(remoteURI, transportOptions);
+                break;
+            default:
+                throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
+        }
 
         return result;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
index 92ffd3c..b01f884 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -30,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
     public static final String DEFAULT_STORE_TYPE = "jks";
     public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
     public static final boolean DEFAULT_TRUST_ALL = false;
-    public static final boolean DEFAULT_VERIFY_HOST = true;
+    public static final boolean DEFAULT_VERIFY_HOST = false;
     public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
     public static final int DEFAULT_SSL_PORT = 5671;