You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/07/20 09:35:38 UTC

[6/9] activemq-artemis git commit: ARTEMIS-637 Port 5.x AMQP test client

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
new file mode 100644
index 0000000..f790433
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+   private static final int QUIET_PERIOD = 20;
+   private static final int SHUTDOWN_TIMEOUT = 100;
+
+   protected Bootstrap bootstrap;
+   protected EventLoopGroup group;
+   protected Channel channel;
+   protected NettyTransportListener listener;
+   protected NettyTransportOptions options;
+   protected final URI remote;
+   protected boolean secure;
+
+   private final AtomicBoolean connected = new AtomicBoolean();
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private final CountDownLatch connectLatch = new CountDownLatch(1);
+   private IOException failureCause;
+   private Throwable pendingFailure;
+
+   /**
+    * Create a new transport instance
+    *
+    * @param remoteLocation the URI that defines the remote resource to connect to.
+    * @param options        the transport options used to configure the socket connection.
+    */
+   public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+      this(null, remoteLocation, options);
+   }
+
+   /**
+    * Create a new transport instance
+    *
+    * @param listener       the TransportListener that will receive events from this Transport.
+    * @param remoteLocation the URI that defines the remote resource to connect to.
+    * @param options        the transport options used to configure the socket connection.
+    */
+   public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+      this.options = options;
+      this.listener = listener;
+      this.remote = remoteLocation;
+      this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
+   }
+
+   @Override
+   public void connect() throws IOException {
+
+      if (listener == null) {
+         throw new IllegalStateException("A transport listener must be set before connection attempts.");
+      }
+
+      group = new NioEventLoopGroup(1);
+
+      bootstrap = new Bootstrap();
+      bootstrap.group(group);
+      bootstrap.channel(NioSocketChannel.class);
+      bootstrap.handler(new ChannelInitializer<Channel>() {
+
+         @Override
+         public void initChannel(Channel connectedChannel) throws Exception {
+            configureChannel(connectedChannel);
+         }
+      });
+
+      configureNetty(bootstrap, getTransportOptions());
+
+      ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+      future.addListener(new ChannelFutureListener() {
+
+         @Override
+         public void operationComplete(ChannelFuture future) throws Exception {
+            if (future.isSuccess()) {
+               handleConnected(future.channel());
+            }
+            else if (future.isCancelled()) {
+               connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+            }
+            else {
+               connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+            }
+         }
+      });
+
+      try {
+         connectLatch.await();
+      }
+      catch (InterruptedException ex) {
+         LOG.debug("Transport connection was interrupted.");
+         Thread.interrupted();
+         failureCause = IOExceptionSupport.create(ex);
+      }
+
+      if (failureCause != null) {
+         // Close out any Netty resources now as they are no longer needed.
+         if (channel != null) {
+            channel.close().syncUninterruptibly();
+            channel = null;
+         }
+         if (group != null) {
+            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            group = null;
+         }
+
+         throw failureCause;
+      }
+      else {
+         // Connected, allow any held async error to fire now and close the transport.
+         channel.eventLoop().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               if (pendingFailure != null) {
+                  channel.pipeline().fireExceptionCaught(pendingFailure);
+               }
+            }
+         });
+      }
+   }
+
+   @Override
+   public boolean isConnected() {
+      return connected.get();
+   }
+
+   @Override
+   public boolean isSSL() {
+      return secure;
+   }
+
+   @Override
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         connected.set(false);
+         if (channel != null) {
+            channel.close().syncUninterruptibly();
+         }
+         if (group != null) {
+            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+         }
+      }
+   }
+
+   @Override
+   public ByteBuf allocateSendBuffer(int size) throws IOException {
+      checkConnected();
+      return channel.alloc().ioBuffer(size, size);
+   }
+
+   @Override
+   public void send(ByteBuf output) throws IOException {
+      checkConnected();
+      int length = output.readableBytes();
+      if (length == 0) {
+         return;
+      }
+
+      LOG.trace("Attempted write of: {} bytes", length);
+
+      channel.writeAndFlush(output);
+   }
+
+   @Override
+   public NettyTransportListener getTransportListener() {
+      return listener;
+   }
+
+   @Override
+   public void setTransportListener(NettyTransportListener listener) {
+      this.listener = listener;
+   }
+
+   @Override
+   public NettyTransportOptions getTransportOptions() {
+      if (options == null) {
+         if (isSSL()) {
+            options = NettyTransportSslOptions.INSTANCE;
+         }
+         else {
+            options = NettyTransportOptions.INSTANCE;
+         }
+      }
+
+      return options;
+   }
+
+   @Override
+   public URI getRemoteLocation() {
+      return remote;
+   }
+
+   @Override
+   public Principal getLocalPrincipal() {
+      if (!isSSL()) {
+         throw new UnsupportedOperationException("Not connected to a secure channel");
+      }
+
+      SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+      return sslHandler.engine().getSession().getLocalPrincipal();
+   }
+
+   //----- Internal implementation details, can be overridden as needed --//
+
+   protected String getRemoteHost() {
+      return remote.getHost();
+   }
+
+   protected int getRemotePort() {
+      int port = remote.getPort();
+
+      if (port <= 0) {
+         if (isSSL()) {
+            port = getSslOptions().getDefaultSslPort();
+         }
+         else {
+            port = getTransportOptions().getDefaultTcpPort();
+         }
+      }
+
+      return port;
+   }
+
+   protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+      if (options.getSendBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+      }
+
+      if (options.getReceiveBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+      }
+
+      if (options.getTrafficClass() != -1) {
+         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+      }
+   }
+
+   protected void configureChannel(final Channel channel) throws Exception {
+      if (isSSL()) {
+         SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+         sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+            @Override
+            public void operationComplete(Future<Channel> future) throws Exception {
+               if (future.isSuccess()) {
+                  LOG.trace("SSL Handshake has completed: {}", channel);
+                  connectionEstablished(channel);
+               }
+               else {
+                  LOG.trace("SSL Handshake has failed: {}", channel);
+                  connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+               }
+            }
+         });
+
+         channel.pipeline().addLast(sslHandler);
+      }
+
+      channel.pipeline().addLast(new NettyTcpTransportHandler());
+   }
+
+   protected void handleConnected(final Channel channel) throws Exception {
+      if (!isSSL()) {
+         connectionEstablished(channel);
+      }
+   }
+
+   //----- State change handlers and checks ---------------------------------//
+
+   /**
+    * Called when the transport has successfully connected and is ready for use.
+    */
+   protected void connectionEstablished(Channel connectedChannel) {
+      channel = connectedChannel;
+      connected.set(true);
+      connectLatch.countDown();
+   }
+
+   /**
+    * Called when the transport connection failed and an error should be returned.
+    *
+    * @param failedChannel The Channel instance that failed.
+    * @param cause         An IOException that describes the cause of the failed connection.
+    */
+   protected void connectionFailed(Channel failedChannel, IOException cause) {
+      failureCause = IOExceptionSupport.create(cause);
+      channel = failedChannel;
+      connected.set(false);
+      connectLatch.countDown();
+   }
+
+   private NettyTransportSslOptions getSslOptions() {
+      return (NettyTransportSslOptions) getTransportOptions();
+   }
+
+   private void checkConnected() throws IOException {
+      if (!connected.get()) {
+         throw new IOException("Cannot send to a non-connected transport.");
+      }
+   }
+
+   //----- Handle connection events -----------------------------------------//
+
+   private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+      @Override
+      public void channelActive(ChannelHandlerContext context) throws Exception {
+         LOG.trace("Channel has become active! Channel is {}", context.channel());
+      }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext context) throws Exception {
+         LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+         if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportClosed listener");
+            listener.onTransportClosed();
+         }
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+         LOG.trace("Exception on channel! Channel is {}", context.channel());
+         if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportError listener");
+            if (pendingFailure != null) {
+               listener.onTransportError(pendingFailure);
+            }
+            else {
+               listener.onTransportError(cause);
+            }
+         }
+         else {
+            // Hold the first failure for later dispatch if connect succeeds.
+            // This will then trigger disconnect using the first error reported.
+            if (pendingFailure != null) {
+               LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+               pendingFailure = cause;
+            }
+         }
+      }
+
+      @Override
+      protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+         LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+         listener.onData(buffer);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
new file mode 100644
index 0000000..a2bacdc
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ *
+ */
+public interface NettyTransport {
+
+   void connect() throws IOException;
+
+   boolean isConnected();
+
+   boolean isSSL();
+
+   void close() throws IOException;
+
+   ByteBuf allocateSendBuffer(int size) throws IOException;
+
+   void send(ByteBuf output) throws IOException;
+
+   NettyTransportListener getTransportListener();
+
+   void setTransportListener(NettyTransportListener listener);
+
+   NettyTransportOptions getTransportOptions();
+
+   URI getRemoteLocation();
+
+   Principal getLocalPrincipal();
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
new file mode 100644
index 0000000..5663713
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
+
+/**
+ * Factory for creating the Netty based TCP Transport.
+ */
+public final class NettyTransportFactory {
+
+   private NettyTransportFactory() {
+   }
+
+   /**
+    * Creates an instance of the given Transport and configures it using the
+    * properties set on the given remote broker URI.
+    *
+    * @param remoteURI The URI used to connect to a remote Peer.
+    * @return a new Transport instance.
+    * @throws Exception if an error occurs while creating the Transport instance.
+    */
+   public static NettyTransport createTransport(URI remoteURI) throws Exception {
+      Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+      Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+      NettyTransportOptions transportOptions = null;
+
+      remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+      if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
+         transportOptions = NettyTransportOptions.INSTANCE.clone();
+      }
+      else {
+         transportOptions = NettyTransportSslOptions.INSTANCE.clone();
+      }
+
+      Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions);
+      if (!unused.isEmpty()) {
+         String msg = " Not all transport options could be set on the TCP based" +
+            " Transport. Check the options are spelled correctly." +
+            " Unused parameters=[" + unused + "]." +
+            " This provider instance cannot be started.";
+         throw new IllegalArgumentException(msg);
+      }
+
+      NettyTransport result = null;
+
+      switch (remoteURI.getScheme().toLowerCase()) {
+         case "tcp":
+         case "ssl":
+            result = new NettyTcpTransport(remoteURI, transportOptions);
+            break;
+         case "ws":
+         case "wss":
+            result = new NettyWSTransport(remoteURI, transportOptions);
+            break;
+         default:
+            throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
+      }
+
+      return result;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
new file mode 100644
index 0000000..c23ca8c
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Listener interface that should be implemented by users of the various
+ * QpidJMS Transport classes.
+ */
+public interface NettyTransportListener {
+
+   /**
+    * Called when new incoming data has become available.
+    *
+    * @param incoming the next incoming packet of data.
+    */
+   void onData(ByteBuf incoming);
+
+   /**
+    * Called if the connection state becomes closed.
+    */
+   void onTransportClosed();
+
+   /**
+    * Called when an error occurs during normal Transport operations.
+    *
+    * @param cause the error that triggered this event.
+    */
+   void onTransportError(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
new file mode 100644
index 0000000..3ffb8c8
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class NettyTransportOptions implements Cloneable {
+
+   public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+   public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+   public static final int DEFAULT_TRAFFIC_CLASS = 0;
+   public static final boolean DEFAULT_TCP_NO_DELAY = true;
+   public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+   public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+   public static final int DEFAULT_SO_TIMEOUT = -1;
+   public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+   public static final int DEFAULT_TCP_PORT = 5672;
+
+   public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
+
+   private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+   private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+   private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+   private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+   private int soTimeout = DEFAULT_SO_TIMEOUT;
+   private int soLinger = DEFAULT_SO_LINGER;
+   private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+   private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+   private int defaultTcpPort = DEFAULT_TCP_PORT;
+
+   /**
+    * @return the currently set send buffer size in bytes.
+    */
+   public int getSendBufferSize() {
+      return sendBufferSize;
+   }
+
+   /**
+    * Sets the send buffer size in bytes, the value must be greater than zero
+    * or an {@link IllegalArgumentException} will be thrown.
+    *
+    * @param sendBufferSize the new send buffer size for the TCP Transport.
+    * @throws IllegalArgumentException if the value given is not in the valid range.
+    */
+   public void setSendBufferSize(int sendBufferSize) {
+      if (sendBufferSize <= 0) {
+         throw new IllegalArgumentException("The send buffer size must be > 0");
+      }
+
+      this.sendBufferSize = sendBufferSize;
+   }
+
+   /**
+    * @return the currently configured receive buffer size in bytes.
+    */
+   public int getReceiveBufferSize() {
+      return receiveBufferSize;
+   }
+
+   /**
+    * Sets the receive buffer size in bytes, the value must be greater than zero
+    * or an {@link IllegalArgumentException} will be thrown.
+    *
+    * @param receiveBufferSize the new receive buffer size for the TCP Transport.
+    * @throws IllegalArgumentException if the value given is not in the valid range.
+    */
+   public void setReceiveBufferSize(int receiveBufferSize) {
+      if (receiveBufferSize <= 0) {
+         throw new IllegalArgumentException("The send buffer size must be > 0");
+      }
+
+      this.receiveBufferSize = receiveBufferSize;
+   }
+
+   /**
+    * @return the currently configured traffic class value.
+    */
+   public int getTrafficClass() {
+      return trafficClass;
+   }
+
+   /**
+    * Sets the traffic class value used by the TCP connection, valid
+    * range is between 0 and 255.
+    *
+    * @param trafficClass the new traffic class value.
+    * @throws IllegalArgumentException if the value given is not in the valid range.
+    */
+   public void setTrafficClass(int trafficClass) {
+      if (trafficClass < 0 || trafficClass > 255) {
+         throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+      }
+
+      this.trafficClass = trafficClass;
+   }
+
+   public int getSoTimeout() {
+      return soTimeout;
+   }
+
+   public void setSoTimeout(int soTimeout) {
+      this.soTimeout = soTimeout;
+   }
+
+   public boolean isTcpNoDelay() {
+      return tcpNoDelay;
+   }
+
+   public void setTcpNoDelay(boolean tcpNoDelay) {
+      this.tcpNoDelay = tcpNoDelay;
+   }
+
+   public int getSoLinger() {
+      return soLinger;
+   }
+
+   public void setSoLinger(int soLinger) {
+      this.soLinger = soLinger;
+   }
+
+   public boolean isTcpKeepAlive() {
+      return tcpKeepAlive;
+   }
+
+   public void setTcpKeepAlive(boolean keepAlive) {
+      this.tcpKeepAlive = keepAlive;
+   }
+
+   public int getConnectTimeout() {
+      return connectTimeout;
+   }
+
+   public void setConnectTimeout(int connectTimeout) {
+      this.connectTimeout = connectTimeout;
+   }
+
+   public int getDefaultTcpPort() {
+      return defaultTcpPort;
+   }
+
+   public void setDefaultTcpPort(int defaultTcpPort) {
+      this.defaultTcpPort = defaultTcpPort;
+   }
+
+   @Override
+   public NettyTransportOptions clone() {
+      return copyOptions(new NettyTransportOptions());
+   }
+
+   protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
+      copy.setConnectTimeout(getConnectTimeout());
+      copy.setReceiveBufferSize(getReceiveBufferSize());
+      copy.setSendBufferSize(getSendBufferSize());
+      copy.setSoLinger(getSoLinger());
+      copy.setSoTimeout(getSoTimeout());
+      copy.setTcpKeepAlive(isTcpKeepAlive());
+      copy.setTcpNoDelay(isTcpNoDelay());
+      copy.setTrafficClass(getTrafficClass());
+
+      return copy;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
new file mode 100644
index 0000000..e256fbb
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Holds the defined SSL options for connections that operate over a secure
+ * transport.  Options are read from the environment and can be overridden by
+ * specifying them on the connection URI.
+ */
+public class NettyTransportSslOptions extends NettyTransportOptions {
+
+   public static final String DEFAULT_STORE_TYPE = "jks";
+   public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
+   public static final boolean DEFAULT_TRUST_ALL = false;
+   public static final boolean DEFAULT_VERIFY_HOST = false;
+   public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
+   public static final int DEFAULT_SSL_PORT = 5671;
+
+   public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
+
+   private String keyStoreLocation;
+   private String keyStorePassword;
+   private String trustStoreLocation;
+   private String trustStorePassword;
+   private String storeType = DEFAULT_STORE_TYPE;
+   private String[] enabledCipherSuites;
+   private String[] disabledCipherSuites;
+   private String[] enabledProtocols;
+   private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]);
+   private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL;
+
+   private boolean trustAll = DEFAULT_TRUST_ALL;
+   private boolean verifyHost = DEFAULT_VERIFY_HOST;
+   private String keyAlias;
+   private int defaultSslPort = DEFAULT_SSL_PORT;
+
+   static {
+      INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
+      INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+      INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
+      INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+   }
+
+   /**
+    * @return the keyStoreLocation currently configured.
+    */
+   public String getKeyStoreLocation() {
+      return keyStoreLocation;
+   }
+
+   /**
+    * Sets the location on disk of the key store to use.
+    *
+    * @param keyStoreLocation the keyStoreLocation to use to create the key manager.
+    */
+   public void setKeyStoreLocation(String keyStoreLocation) {
+      this.keyStoreLocation = keyStoreLocation;
+   }
+
+   /**
+    * @return the keyStorePassword
+    */
+   public String getKeyStorePassword() {
+      return keyStorePassword;
+   }
+
+   /**
+    * @param keyStorePassword the keyStorePassword to set
+    */
+   public void setKeyStorePassword(String keyStorePassword) {
+      this.keyStorePassword = keyStorePassword;
+   }
+
+   /**
+    * @return the trustStoreLocation
+    */
+   public String getTrustStoreLocation() {
+      return trustStoreLocation;
+   }
+
+   /**
+    * @param trustStoreLocation the trustStoreLocation to set
+    */
+   public void setTrustStoreLocation(String trustStoreLocation) {
+      this.trustStoreLocation = trustStoreLocation;
+   }
+
+   /**
+    * @return the trustStorePassword
+    */
+   public String getTrustStorePassword() {
+      return trustStorePassword;
+   }
+
+   /**
+    * @param trustStorePassword the trustStorePassword to set
+    */
+   public void setTrustStorePassword(String trustStorePassword) {
+      this.trustStorePassword = trustStorePassword;
+   }
+
+   /**
+    * @return the storeType
+    */
+   public String getStoreType() {
+      return storeType;
+   }
+
+   /**
+    * @param storeType the format that the store files are encoded in.
+    */
+   public void setStoreType(String storeType) {
+      this.storeType = storeType;
+   }
+
+   /**
+    * @return the enabledCipherSuites
+    */
+   public String[] getEnabledCipherSuites() {
+      return enabledCipherSuites;
+   }
+
+   /**
+    * @param enabledCipherSuites the enabledCipherSuites to set
+    */
+   public void setEnabledCipherSuites(String[] enabledCipherSuites) {
+      this.enabledCipherSuites = enabledCipherSuites;
+   }
+
+   /**
+    * @return the disabledCipherSuites
+    */
+   public String[] getDisabledCipherSuites() {
+      return disabledCipherSuites;
+   }
+
+   /**
+    * @param disabledCipherSuites the disabledCipherSuites to set
+    */
+   public void setDisabledCipherSuites(String[] disabledCipherSuites) {
+      this.disabledCipherSuites = disabledCipherSuites;
+   }
+
+   /**
+    * @return the enabledProtocols or null if the defaults should be used
+    */
+   public String[] getEnabledProtocols() {
+      return enabledProtocols;
+   }
+
+   /**
+    * The protocols to be set as enabled.
+    *
+    * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
+    */
+   public void setEnabledProtocols(String[] enabledProtocols) {
+      this.enabledProtocols = enabledProtocols;
+   }
+
+   /**
+    * @return the protocols to disable or null if none should be
+    */
+   public String[] getDisabledProtocols() {
+      return disabledProtocols;
+   }
+
+   /**
+    * The protocols to be disable.
+    *
+    * @param disabledProtocols the protocols to disable, or null if none should be.
+    */
+   public void setDisabledProtocols(String[] disabledProtocols) {
+      this.disabledProtocols = disabledProtocols;
+   }
+
+   /**
+    * @return the context protocol to use
+    */
+   public String getContextProtocol() {
+      return contextProtocol;
+   }
+
+   /**
+    * The protocol value to use when creating an SSLContext via
+    * SSLContext.getInstance(protocol).
+    *
+    * @param contextProtocol the context protocol to use.
+    */
+   public void setContextProtocol(String contextProtocol) {
+      this.contextProtocol = contextProtocol;
+   }
+
+   /**
+    * @return the trustAll
+    */
+   public boolean isTrustAll() {
+      return trustAll;
+   }
+
+   /**
+    * @param trustAll the trustAll to set
+    */
+   public void setTrustAll(boolean trustAll) {
+      this.trustAll = trustAll;
+   }
+
+   /**
+    * @return the verifyHost
+    */
+   public boolean isVerifyHost() {
+      return verifyHost;
+   }
+
+   /**
+    * @param verifyHost the verifyHost to set
+    */
+   public void setVerifyHost(boolean verifyHost) {
+      this.verifyHost = verifyHost;
+   }
+
+   /**
+    * @return the key alias
+    */
+   public String getKeyAlias() {
+      return keyAlias;
+   }
+
+   /**
+    * @param keyAlias the key alias to use
+    */
+   public void setKeyAlias(String keyAlias) {
+      this.keyAlias = keyAlias;
+   }
+
+   public int getDefaultSslPort() {
+      return defaultSslPort;
+   }
+
+   public void setDefaultSslPort(int defaultSslPort) {
+      this.defaultSslPort = defaultSslPort;
+   }
+
+   @Override
+   public NettyTransportSslOptions clone() {
+      return copyOptions(new NettyTransportSslOptions());
+   }
+
+   protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) {
+      super.copyOptions(copy);
+
+      copy.setKeyStoreLocation(getKeyStoreLocation());
+      copy.setKeyStorePassword(getKeyStorePassword());
+      copy.setTrustStoreLocation(getTrustStoreLocation());
+      copy.setTrustStorePassword(getTrustStorePassword());
+      copy.setStoreType(getStoreType());
+      copy.setEnabledCipherSuites(getEnabledCipherSuites());
+      copy.setDisabledCipherSuites(getDisabledCipherSuites());
+      copy.setEnabledProtocols(getEnabledProtocols());
+      copy.setDisabledProtocols(getDisabledProtocols());
+      copy.setTrustAll(isTrustAll());
+      copy.setVerifyHost(isVerifyHost());
+      copy.setKeyAlias(getKeyAlias());
+      copy.setContextProtocol(getContextProtocol());
+      return copy;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
new file mode 100644
index 0000000..51cedea
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedKeyManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import io.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static class that provides various utility methods used by Transport implementations.
+ */
+public class NettyTransportSupport {
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
+
+   /**
+    * Creates a Netty SslHandler instance for use in Transports that require
+    * an SSL encoder / decoder.
+    *
+    * @param remote  The URI of the remote peer that the SslHandler will be used against.
+    * @param options The SSL options object to build the SslHandler instance from.
+    * @return a new SslHandler that is configured from the given options.
+    * @throws Exception if an error occurs while creating the SslHandler instance.
+    */
+   public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
+      return new SslHandler(createSslEngine(remote, createSslContext(options), options));
+   }
+
+   /**
+    * Create a new SSLContext using the options specific in the given TransportSslOptions
+    * instance.
+    *
+    * @param options the configured options used to create the SSLContext.
+    * @return a new SSLContext instance.
+    * @throws Exception if an error occurs while creating the context.
+    */
+   public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
+      try {
+         String contextProtocol = options.getContextProtocol();
+         LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol);
+
+         SSLContext context = SSLContext.getInstance(contextProtocol);
+         KeyManager[] keyMgrs = loadKeyManagers(options);
+         TrustManager[] trustManagers = loadTrustManagers(options);
+
+         context.init(keyMgrs, trustManagers, new SecureRandom());
+         return context;
+      }
+      catch (Exception e) {
+         LOG.error("Failed to create SSLContext: {}", e, e);
+         throw e;
+      }
+   }
+
+   /**
+    * Create a new SSLEngine instance in client mode from the given SSLContext and
+    * TransportSslOptions instances.
+    *
+    * @param context the SSLContext to use when creating the engine.
+    * @param options the TransportSslOptions to use to configure the new SSLEngine.
+    * @return a new SSLEngine instance in client mode.
+    * @throws Exception if an error occurs while creating the new SSLEngine.
+    */
+   public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
+      return createSslEngine(null, context, options);
+   }
+
+   /**
+    * Create a new SSLEngine instance in client mode from the given SSLContext and
+    * TransportSslOptions instances.
+    *
+    * @param remote  the URI of the remote peer that will be used to initialize the engine, may be null if none should.
+    * @param context the SSLContext to use when creating the engine.
+    * @param options the TransportSslOptions to use to configure the new SSLEngine.
+    * @return a new SSLEngine instance in client mode.
+    * @throws Exception if an error occurs while creating the new SSLEngine.
+    */
+   public static SSLEngine createSslEngine(URI remote,
+                                           SSLContext context,
+                                           NettyTransportSslOptions options) throws Exception {
+      SSLEngine engine = null;
+      if (remote == null) {
+         engine = context.createSSLEngine();
+      }
+      else {
+         engine = context.createSSLEngine(remote.getHost(), remote.getPort());
+      }
+
+      engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
+      engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
+      engine.setUseClientMode(true);
+
+      if (options.isVerifyHost()) {
+         SSLParameters sslParameters = engine.getSSLParameters();
+         sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+         engine.setSSLParameters(sslParameters);
+      }
+
+      return engine;
+   }
+
+   private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) {
+      List<String> enabledProtocols = new ArrayList<>();
+
+      if (options.getEnabledProtocols() != null) {
+         List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols());
+         LOG.trace("Configured protocols from transport options: {}", configuredProtocols);
+         enabledProtocols.addAll(configuredProtocols);
+      }
+      else {
+         List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
+         LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols);
+         enabledProtocols.addAll(engineProtocols);
+      }
+
+      String[] disabledProtocols = options.getDisabledProtocols();
+      if (disabledProtocols != null) {
+         List<String> disabled = Arrays.asList(disabledProtocols);
+         LOG.trace("Disabled protocols: {}", disabled);
+         enabledProtocols.removeAll(disabled);
+      }
+
+      LOG.trace("Enabled protocols: {}", enabledProtocols);
+
+      return enabledProtocols.toArray(new String[0]);
+   }
+
+   private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) {
+      List<String> enabledCipherSuites = new ArrayList<>();
+
+      if (options.getEnabledCipherSuites() != null) {
+         List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites());
+         LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites);
+         enabledCipherSuites.addAll(configuredCipherSuites);
+      }
+      else {
+         List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
+         LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites);
+         enabledCipherSuites.addAll(engineCipherSuites);
+      }
+
+      String[] disabledCipherSuites = options.getDisabledCipherSuites();
+      if (disabledCipherSuites != null) {
+         List<String> disabled = Arrays.asList(disabledCipherSuites);
+         LOG.trace("Disabled cipher suites: {}", disabled);
+         enabledCipherSuites.removeAll(disabled);
+      }
+
+      LOG.trace("Enabled cipher suites: {}", enabledCipherSuites);
+
+      return enabledCipherSuites.toArray(new String[0]);
+   }
+
+   private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
+      if (options.isTrustAll()) {
+         return new TrustManager[]{createTrustAllTrustManager()};
+      }
+
+      if (options.getTrustStoreLocation() == null) {
+         return null;
+      }
+
+      TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+
+      String storeLocation = options.getTrustStoreLocation();
+      String storePassword = options.getTrustStorePassword();
+      String storeType = options.getStoreType();
+
+      LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType);
+
+      KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
+      fact.init(trustStore);
+
+      return fact.getTrustManagers();
+   }
+
+   private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception {
+      if (options.getKeyStoreLocation() == null) {
+         return null;
+      }
+
+      KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+
+      String storeLocation = options.getKeyStoreLocation();
+      String storePassword = options.getKeyStorePassword();
+      String storeType = options.getStoreType();
+      String alias = options.getKeyAlias();
+
+      LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+      KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
+      fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
+
+      if (alias == null) {
+         return fact.getKeyManagers();
+      }
+      else {
+         validateAlias(keyStore, alias);
+         return wrapKeyManagers(alias, fact.getKeyManagers());
+      }
+   }
+
+   private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) {
+      KeyManager[] keyManagers = new KeyManager[origKeyManagers.length];
+      for (int i = 0; i < origKeyManagers.length; i++) {
+         KeyManager km = origKeyManagers[i];
+         if (km instanceof X509ExtendedKeyManager) {
+            km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km);
+         }
+
+         keyManagers[i] = km;
+      }
+
+      return keyManagers;
+   }
+
+   private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException {
+      if (!store.containsAlias(alias)) {
+         throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store");
+      }
+
+      if (!store.isKeyEntry(alias)) {
+         throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry");
+      }
+   }
+
+   private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
+      KeyStore store = KeyStore.getInstance(storeType);
+      try (InputStream in = new FileInputStream(new File(storePath));) {
+         store.load(in, password != null ? password.toCharArray() : null);
+      }
+
+      return store;
+   }
+
+   private static TrustManager createTrustAllTrustManager() {
+      return new X509TrustManager() {
+         @Override
+         public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+         }
+
+         @Override
+         public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+         }
+
+         @Override
+         public X509Certificate[] getAcceptedIssuers() {
+            return new X509Certificate[0];
+         }
+      };
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
new file mode 100644
index 0000000..b28f523
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transport for communicating over WebSockets
+ */
+public class NettyWSTransport implements NettyTransport {
+
+   private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
+
+   private static final int QUIET_PERIOD = 20;
+   private static final int SHUTDOWN_TIMEOUT = 100;
+
+   protected Bootstrap bootstrap;
+   protected EventLoopGroup group;
+   protected Channel channel;
+   protected NettyTransportListener listener;
+   protected NettyTransportOptions options;
+   protected final URI remote;
+   protected boolean secure;
+
+   private final AtomicBoolean connected = new AtomicBoolean();
+   private final AtomicBoolean closed = new AtomicBoolean();
+   private ChannelPromise handshakeFuture;
+   private IOException failureCause;
+   private Throwable pendingFailure;
+
+   /**
+    * Create a new transport instance
+    *
+    * @param remoteLocation the URI that defines the remote resource to connect to.
+    * @param options        the transport options used to configure the socket connection.
+    */
+   public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
+      this(null, remoteLocation, options);
+   }
+
+   /**
+    * Create a new transport instance
+    *
+    * @param listener       the TransportListener that will receive events from this Transport.
+    * @param remoteLocation the URI that defines the remote resource to connect to.
+    * @param options        the transport options used to configure the socket connection.
+    */
+   public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+      this.options = options;
+      this.listener = listener;
+      this.remote = remoteLocation;
+      this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
+   }
+
+   @Override
+   public void connect() throws IOException {
+
+      if (listener == null) {
+         throw new IllegalStateException("A transport listener must be set before connection attempts.");
+      }
+
+      group = new NioEventLoopGroup(1);
+
+      bootstrap = new Bootstrap();
+      bootstrap.group(group);
+      bootstrap.channel(NioSocketChannel.class);
+      bootstrap.handler(new ChannelInitializer<Channel>() {
+
+         @Override
+         public void initChannel(Channel connectedChannel) throws Exception {
+            configureChannel(connectedChannel);
+         }
+      });
+
+      configureNetty(bootstrap, getTransportOptions());
+
+      ChannelFuture future;
+      try {
+         future = bootstrap.connect(getRemoteHost(), getRemotePort());
+         future.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+               if (future.isSuccess()) {
+                  handleConnected(future.channel());
+               }
+               else if (future.isCancelled()) {
+                  connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+               }
+               else {
+                  connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+               }
+            }
+         });
+
+         future.sync();
+
+         // Now wait for WS protocol level handshake completion
+         handshakeFuture.await();
+      }
+      catch (InterruptedException ex) {
+         LOG.debug("Transport connection attempt was interrupted.");
+         Thread.interrupted();
+         failureCause = IOExceptionSupport.create(ex);
+      }
+
+      if (failureCause != null) {
+         // Close out any Netty resources now as they are no longer needed.
+         if (channel != null) {
+            channel.close().syncUninterruptibly();
+            channel = null;
+         }
+         if (group != null) {
+            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+            group = null;
+         }
+
+         throw failureCause;
+      }
+      else {
+         // Connected, allow any held async error to fire now and close the transport.
+         channel.eventLoop().execute(new Runnable() {
+
+            @Override
+            public void run() {
+               if (pendingFailure != null) {
+                  channel.pipeline().fireExceptionCaught(pendingFailure);
+               }
+            }
+         });
+      }
+   }
+
+   @Override
+   public boolean isConnected() {
+      return connected.get();
+   }
+
+   @Override
+   public boolean isSSL() {
+      return secure;
+   }
+
+   @Override
+   public void close() throws IOException {
+      if (closed.compareAndSet(false, true)) {
+         connected.set(false);
+         if (channel != null) {
+            channel.close().syncUninterruptibly();
+         }
+         if (group != null) {
+            group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+         }
+      }
+   }
+
+   @Override
+   public ByteBuf allocateSendBuffer(int size) throws IOException {
+      checkConnected();
+      return channel.alloc().ioBuffer(size, size);
+   }
+
+   @Override
+   public void send(ByteBuf output) throws IOException {
+      checkConnected();
+      int length = output.readableBytes();
+      if (length == 0) {
+         return;
+      }
+
+      LOG.trace("Attempted write of: {} bytes", length);
+
+      channel.writeAndFlush(new BinaryWebSocketFrame(output));
+   }
+
+   @Override
+   public NettyTransportListener getTransportListener() {
+      return listener;
+   }
+
+   @Override
+   public void setTransportListener(NettyTransportListener listener) {
+      this.listener = listener;
+   }
+
+   @Override
+   public NettyTransportOptions getTransportOptions() {
+      if (options == null) {
+         if (isSSL()) {
+            options = NettyTransportSslOptions.INSTANCE;
+         }
+         else {
+            options = NettyTransportOptions.INSTANCE;
+         }
+      }
+
+      return options;
+   }
+
+   @Override
+   public URI getRemoteLocation() {
+      return remote;
+   }
+
+   @Override
+   public Principal getLocalPrincipal() {
+      if (!isSSL()) {
+         throw new UnsupportedOperationException("Not connected to a secure channel");
+      }
+
+      SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+      return sslHandler.engine().getSession().getLocalPrincipal();
+   }
+
+   //----- Internal implementation details, can be overridden as needed --//
+
+   protected String getRemoteHost() {
+      return remote.getHost();
+   }
+
+   protected int getRemotePort() {
+      int port = remote.getPort();
+
+      if (port <= 0) {
+         if (isSSL()) {
+            port = getSslOptions().getDefaultSslPort();
+         }
+         else {
+            port = getTransportOptions().getDefaultTcpPort();
+         }
+      }
+
+      return port;
+   }
+
+   protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+      bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+      bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+      bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+      bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+      bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+      if (options.getSendBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+      }
+
+      if (options.getReceiveBufferSize() != -1) {
+         bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+         bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+      }
+
+      if (options.getTrafficClass() != -1) {
+         bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+      }
+   }
+
+   protected void configureChannel(final Channel channel) throws Exception {
+      if (isSSL()) {
+         SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+         sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+            @Override
+            public void operationComplete(Future<Channel> future) throws Exception {
+               if (future.isSuccess()) {
+                  LOG.trace("SSL Handshake has completed: {}", channel);
+                  connectionEstablished(channel);
+               }
+               else {
+                  LOG.trace("SSL Handshake has failed: {}", channel);
+                  connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+               }
+            }
+         });
+
+         channel.pipeline().addLast(sslHandler);
+      }
+
+      channel.pipeline().addLast(new HttpClientCodec());
+      channel.pipeline().addLast(new HttpObjectAggregator(8192));
+      channel.pipeline().addLast(new NettyTcpTransportHandler());
+   }
+
+   protected void handleConnected(final Channel channel) throws Exception {
+      if (!isSSL()) {
+         connectionEstablished(channel);
+      }
+   }
+
+   //----- State change handlers and checks ---------------------------------//
+
+   /**
+    * Called when the transport has successfully connected and is ready for use.
+    */
+   protected void connectionEstablished(Channel connectedChannel) {
+      LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
+      channel = connectedChannel;
+      connected.set(true);
+   }
+
+   /**
+    * Called when the transport connection failed and an error should be returned.
+    *
+    * @param failedChannel The Channel instance that failed.
+    * @param cause         An IOException that describes the cause of the failed connection.
+    */
+   protected void connectionFailed(Channel failedChannel, IOException cause) {
+      failureCause = IOExceptionSupport.create(cause);
+      channel = failedChannel;
+      connected.set(false);
+      handshakeFuture.setFailure(cause);
+   }
+
+   private NettyTransportSslOptions getSslOptions() {
+      return (NettyTransportSslOptions) getTransportOptions();
+   }
+
+   private void checkConnected() throws IOException {
+      if (!connected.get()) {
+         throw new IOException("Cannot send to a non-connected transport.");
+      }
+   }
+
+   //----- Handle connection events -----------------------------------------//
+
+   private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+
+      private final WebSocketClientHandshaker handshaker;
+
+      NettyTcpTransportHandler() {
+         handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
+      }
+
+      @Override
+      public void handlerAdded(ChannelHandlerContext context) {
+         LOG.trace("Handler has become added! Channel is {}", context.channel());
+         handshakeFuture = context.newPromise();
+      }
+
+      @Override
+      public void channelActive(ChannelHandlerContext context) throws Exception {
+         LOG.trace("Channel has become active! Channel is {}", context.channel());
+         handshaker.handshake(context.channel());
+      }
+
+      @Override
+      public void channelInactive(ChannelHandlerContext context) throws Exception {
+         LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+         if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportClosed listener");
+            listener.onTransportClosed();
+         }
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+         LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
+         LOG.trace("Error Stack: ", cause);
+         if (connected.compareAndSet(true, false) && !closed.get()) {
+            LOG.trace("Firing onTransportError listener");
+            if (pendingFailure != null) {
+               listener.onTransportError(pendingFailure);
+            }
+            else {
+               listener.onTransportError(cause);
+            }
+         }
+         else {
+            // Hold the first failure for later dispatch if connect succeeds.
+            // This will then trigger disconnect using the first error reported.
+            if (pendingFailure != null) {
+               LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+               pendingFailure = cause;
+            }
+
+            if (!handshakeFuture.isDone()) {
+               handshakeFuture.setFailure(cause);
+            }
+         }
+      }
+
+      @Override
+      protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
+         LOG.trace("New data read: incoming: {}", message);
+
+         Channel ch = ctx.channel();
+         if (!handshaker.isHandshakeComplete()) {
+            handshaker.finishHandshake(ch, (FullHttpResponse) message);
+            LOG.info("WebSocket Client connected! {}", ctx.channel());
+            handshakeFuture.setSuccess();
+            return;
+         }
+
+         // We shouldn't get this since we handle the handshake previously.
+         if (message instanceof FullHttpResponse) {
+            FullHttpResponse response = (FullHttpResponse) message;
+            throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
+                                               ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+         }
+
+         WebSocketFrame frame = (WebSocketFrame) message;
+         if (frame instanceof TextWebSocketFrame) {
+            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
+            LOG.warn("WebSocket Client received message: " + textFrame.text());
+            ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
+         }
+         else if (frame instanceof BinaryWebSocketFrame) {
+            BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
+            LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+            listener.onData(binaryFrame.content());
+         }
+         else if (frame instanceof PongWebSocketFrame) {
+            LOG.trace("WebSocket Client received pong");
+         }
+         else if (frame instanceof CloseWebSocketFrame) {
+            LOG.trace("WebSocket Client received closing");
+            ch.close();
+         }
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
new file mode 100644
index 0000000..c3c4286
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+/**
+ * A {@link ByteBufAllocator} which is partial pooled. Which means only direct
+ * {@link ByteBuf}s are pooled. The rest is unpooled.
+ *
+ */
+public class PartialPooledByteBufAllocator implements ByteBufAllocator {
+
+   private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false);
+   private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
+
+   public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
+
+   private PartialPooledByteBufAllocator() {
+   }
+
+   @Override
+   public ByteBuf buffer() {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf buffer(int initialCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf ioBuffer() {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf ioBuffer(int initialCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf heapBuffer() {
+      return UNPOOLED.heapBuffer();
+   }
+
+   @Override
+   public ByteBuf heapBuffer(int initialCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+      return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public ByteBuf directBuffer() {
+      return POOLED.directBuffer();
+   }
+
+   @Override
+   public ByteBuf directBuffer(int initialCapacity) {
+      return POOLED.directBuffer(initialCapacity);
+   }
+
+   @Override
+   public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+      return POOLED.directBuffer(initialCapacity, maxCapacity);
+   }
+
+   @Override
+   public CompositeByteBuf compositeBuffer() {
+      return UNPOOLED.compositeHeapBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+   }
+
+   @Override
+   public CompositeByteBuf compositeHeapBuffer() {
+      return UNPOOLED.compositeHeapBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+      return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+   }
+
+   @Override
+   public CompositeByteBuf compositeDirectBuffer() {
+      return POOLED.compositeDirectBuffer();
+   }
+
+   @Override
+   public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+      return POOLED.compositeDirectBuffer();
+   }
+
+   @Override
+   public boolean isDirectBufferPooled() {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
new file mode 100644
index 0000000..42d6a0b
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
+import java.net.Socket;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+
+/**
+ * An X509ExtendedKeyManager wrapper which always chooses and only
+ * returns the given alias, and defers retrieval to the delegate
+ * key manager.
+ */
+public class X509AliasKeyManager extends X509ExtendedKeyManager {
+
+   private X509ExtendedKeyManager delegate;
+   private String alias;
+
+   public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException {
+      if (alias == null) {
+         throw new IllegalArgumentException("The given key alias must not be null.");
+      }
+
+      this.alias = alias;
+      this.delegate = delegate;
+   }
+
+   @Override
+   public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {
+      return alias;
+   }
+
+   @Override
+   public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {
+      return alias;
+   }
+
+   @Override
+   public X509Certificate[] getCertificateChain(String alias) {
+      return delegate.getCertificateChain(alias);
+   }
+
+   @Override
+   public String[] getClientAliases(String keyType, Principal[] issuers) {
+      return new String[]{alias};
+   }
+
+   @Override
+   public PrivateKey getPrivateKey(String alias) {
+      return delegate.getPrivateKey(alias);
+   }
+
+   @Override
+   public String[] getServerAliases(String keyType, Principal[] issuers) {
+      return new String[]{alias};
+   }
+
+   @Override
+   public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
+      return alias;
+   }
+
+   @Override
+   public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
+      return alias;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
new file mode 100644
index 0000000..bb71746
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+   /**
+    * If the operation fails this method is invoked with the Exception
+    * that caused the failure.
+    *
+    * @param result The error that resulted in this asynchronous operation failing.
+    */
+   void onFailure(Throwable result);
+
+   /**
+    * If the operation succeeds the resulting value produced is set to null and
+    * the waiting parties are signaled.
+    */
+   void onSuccess();
+
+   /**
+    * Returns true if the AsyncResult has completed.  The task is considered complete
+    * regardless if it succeeded or failed.
+    *
+    * @return returns true if the asynchronous operation has completed.
+    */
+   boolean isComplete();
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
new file mode 100644
index 0000000..12d38fd
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Asynchronous Client Future class.
+ */
+public class ClientFuture implements AsyncResult {
+
+   private final AtomicBoolean completer = new AtomicBoolean();
+   private final CountDownLatch latch = new CountDownLatch(1);
+   private final ClientFutureSynchronization synchronization;
+   private volatile Throwable error;
+
+   public ClientFuture() {
+      this(null);
+   }
+
+   public ClientFuture(ClientFutureSynchronization synchronization) {
+      this.synchronization = synchronization;
+   }
+
+   @Override
+   public boolean isComplete() {
+      return latch.getCount() == 0;
+   }
+
+   @Override
+   public void onFailure(Throwable result) {
+      if (completer.compareAndSet(false, true)) {
+         error = result;
+         if (synchronization != null) {
+            synchronization.onPendingFailure(error);
+         }
+         latch.countDown();
+      }
+   }
+
+   @Override
+   public void onSuccess() {
+      if (completer.compareAndSet(false, true)) {
+         if (synchronization != null) {
+            synchronization.onPendingSuccess();
+         }
+         latch.countDown();
+      }
+   }
+
+   /**
+    * Timed wait for a response to a pending operation.
+    *
+    * @param amount The amount of time to wait before abandoning the wait.
+    * @param unit   The unit to use for this wait period.
+    * @throws IOException if an error occurs while waiting for the response.
+    */
+   public void sync(long amount, TimeUnit unit) throws IOException {
+      try {
+         latch.await(amount, unit);
+      }
+      catch (InterruptedException e) {
+         Thread.interrupted();
+         throw IOExceptionSupport.create(e);
+      }
+
+      failOnError();
+   }
+
+   /**
+    * Waits for a response to some pending operation.
+    *
+    * @throws IOException if an error occurs while waiting for the response.
+    */
+   public void sync() throws IOException {
+      try {
+         latch.await();
+      }
+      catch (InterruptedException e) {
+         Thread.interrupted();
+         throw IOExceptionSupport.create(e);
+      }
+
+      failOnError();
+   }
+
+   private void failOnError() throws IOException {
+      Throwable cause = error;
+      if (cause != null) {
+         throw IOExceptionSupport.create(cause);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
new file mode 100644
index 0000000..e279bc1
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Synchronization callback interface used to execute state updates
+ * or similar tasks in the thread context where the associated
+ * ProviderFuture is managed.
+ */
+public interface ClientFutureSynchronization {
+
+   void onPendingSuccess();
+
+   void onPendingFailure(Throwable cause);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
new file mode 100644
index 0000000..70d88e6
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+
+/**
+ * Used to make throwing IOException instances easier.
+ */
+public class IOExceptionSupport {
+
+   /**
+    * Checks the given cause to determine if it's already an IOException type and
+    * if not creates a new IOException to wrap it.
+    *
+    * @param cause The initiating exception that should be cast or wrapped.
+    * @return an IOException instance.
+    */
+   public static IOException create(Throwable cause) {
+      if (cause instanceof IOException) {
+         return (IOException) cause;
+      }
+
+      String message = cause.getMessage();
+      if (message == null || message.length() == 0) {
+         message = cause.toString();
+      }
+
+      return new IOException(message, cause);
+   }
+}