You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2016/07/20 09:35:38 UTC
[6/9] activemq-artemis git commit: ARTEMIS-637 Port 5.x AMQP test
client
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
new file mode 100644
index 0000000..f790433
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+ private static final int QUIET_PERIOD = 20;
+ private static final int SHUTDOWN_TIMEOUT = 100;
+
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected NettyTransportListener listener;
+ protected NettyTransportOptions options;
+ protected final URI remote;
+ protected boolean secure;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final CountDownLatch connectLatch = new CountDownLatch(1);
+ private IOException failureCause;
+ private Throwable pendingFailure;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation the URI that defines the remote resource to connect to.
+ * @param options the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener the TransportListener that will receive events from this Transport.
+ * @param remoteLocation the URI that defines the remote resource to connect to.
+ * @param options the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ configureChannel(connectedChannel);
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ handleConnected(future.channel());
+ }
+ else if (future.isCancelled()) {
+ connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+ }
+ else {
+ connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ try {
+ connectLatch.await();
+ }
+ catch (InterruptedException ex) {
+ LOG.debug("Transport connection was interrupted.");
+ Thread.interrupted();
+ failureCause = IOExceptionSupport.create(ex);
+ }
+
+ if (failureCause != null) {
+ // Close out any Netty resources now as they are no longer needed.
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ channel = null;
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ group = null;
+ }
+
+ throw failureCause;
+ }
+ else {
+ // Connected, allow any held async error to fire now and close the transport.
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ if (pendingFailure != null) {
+ channel.pipeline().fireExceptionCaught(pendingFailure);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public boolean isSSL() {
+ return secure;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ connected.set(false);
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf allocateSendBuffer(int size) throws IOException {
+ checkConnected();
+ return channel.alloc().ioBuffer(size, size);
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return;
+ }
+
+ LOG.trace("Attempted write of: {} bytes", length);
+
+ channel.writeAndFlush(output);
+ }
+
+ @Override
+ public NettyTransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(NettyTransportListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public NettyTransportOptions getTransportOptions() {
+ if (options == null) {
+ if (isSSL()) {
+ options = NettyTransportSslOptions.INSTANCE;
+ }
+ else {
+ options = NettyTransportOptions.INSTANCE;
+ }
+ }
+
+ return options;
+ }
+
+ @Override
+ public URI getRemoteLocation() {
+ return remote;
+ }
+
+ @Override
+ public Principal getLocalPrincipal() {
+ if (!isSSL()) {
+ throw new UnsupportedOperationException("Not connected to a secure channel");
+ }
+
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+ return sslHandler.engine().getSession().getLocalPrincipal();
+ }
+
+ //----- Internal implementation details, can be overridden as needed --//
+
+ protected String getRemoteHost() {
+ return remote.getHost();
+ }
+
+ protected int getRemotePort() {
+ int port = remote.getPort();
+
+ if (port <= 0) {
+ if (isSSL()) {
+ port = getSslOptions().getDefaultSslPort();
+ }
+ else {
+ port = getTransportOptions().getDefaultTcpPort();
+ }
+ }
+
+ return port;
+ }
+
+ protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ protected void configureChannel(final Channel channel) throws Exception {
+ if (isSSL()) {
+ SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ LOG.trace("SSL Handshake has completed: {}", channel);
+ connectionEstablished(channel);
+ }
+ else {
+ LOG.trace("SSL Handshake has failed: {}", channel);
+ connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ channel.pipeline().addLast(sslHandler);
+ }
+
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+
+ protected void handleConnected(final Channel channel) throws Exception {
+ if (!isSSL()) {
+ connectionEstablished(channel);
+ }
+ }
+
+ //----- State change handlers and checks ---------------------------------//
+
+ /**
+ * Called when the transport has successfully connected and is ready for use.
+ */
+ protected void connectionEstablished(Channel connectedChannel) {
+ channel = connectedChannel;
+ connected.set(true);
+ connectLatch.countDown();
+ }
+
+ /**
+ * Called when the transport connection failed and an error should be returned.
+ *
+ * @param failedChannel The Channel instance that failed.
+ * @param cause An IOException that describes the cause of the failed connection.
+ */
+ protected void connectionFailed(Channel failedChannel, IOException cause) {
+ failureCause = IOExceptionSupport.create(cause);
+ channel = failedChannel;
+ connected.set(false);
+ connectLatch.countDown();
+ }
+
+ private NettyTransportSslOptions getSslOptions() {
+ return (NettyTransportSslOptions) getTransportOptions();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has become active! Channel is {}", context.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportClosed listener");
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.trace("Exception on channel! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportError listener");
+ if (pendingFailure != null) {
+ listener.onTransportError(pendingFailure);
+ }
+ else {
+ listener.onTransportError(cause);
+ }
+ }
+ else {
+ // Hold the first failure for later dispatch if connect succeeds.
+ // This will then trigger disconnect using the first error reported.
+ if (pendingFailure != null) {
+ LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+ pendingFailure = cause;
+ }
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+ LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+ listener.onData(buffer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
new file mode 100644
index 0000000..a2bacdc
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ *
+ */
+public interface NettyTransport {
+
+ void connect() throws IOException;
+
+ boolean isConnected();
+
+ boolean isSSL();
+
+ void close() throws IOException;
+
+ ByteBuf allocateSendBuffer(int size) throws IOException;
+
+ void send(ByteBuf output) throws IOException;
+
+ NettyTransportListener getTransportListener();
+
+ void setTransportListener(NettyTransportListener listener);
+
+ NettyTransportOptions getTransportOptions();
+
+ URI getRemoteLocation();
+
+ Principal getLocalPrincipal();
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
new file mode 100644
index 0000000..5663713
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
+
+/**
+ * Factory for creating the Netty based TCP Transport.
+ */
+public final class NettyTransportFactory {
+
+ private NettyTransportFactory() {
+ }
+
+ /**
+ * Creates an instance of the given Transport and configures it using the
+ * properties set on the given remote broker URI.
+ *
+ * @param remoteURI The URI used to connect to a remote Peer.
+ * @return a new Transport instance.
+ * @throws Exception if an error occurs while creating the Transport instance.
+ */
+ public static NettyTransport createTransport(URI remoteURI) throws Exception {
+ Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
+ Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
+ NettyTransportOptions transportOptions = null;
+
+ remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+
+ if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
+ transportOptions = NettyTransportOptions.INSTANCE.clone();
+ }
+ else {
+ transportOptions = NettyTransportSslOptions.INSTANCE.clone();
+ }
+
+ Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions);
+ if (!unused.isEmpty()) {
+ String msg = " Not all transport options could be set on the TCP based" +
+ " Transport. Check the options are spelled correctly." +
+ " Unused parameters=[" + unused + "]." +
+ " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+
+ NettyTransport result = null;
+
+ switch (remoteURI.getScheme().toLowerCase()) {
+ case "tcp":
+ case "ssl":
+ result = new NettyTcpTransport(remoteURI, transportOptions);
+ break;
+ case "ws":
+ case "wss":
+ result = new NettyWSTransport(remoteURI, transportOptions);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
new file mode 100644
index 0000000..c23ca8c
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import io.netty.buffer.ByteBuf;
+
+/**
+ * Listener interface that should be implemented by users of the various
+ * QpidJMS Transport classes.
+ */
+public interface NettyTransportListener {
+
+ /**
+ * Called when new incoming data has become available.
+ *
+ * @param incoming the next incoming packet of data.
+ */
+ void onData(ByteBuf incoming);
+
+ /**
+ * Called if the connection state becomes closed.
+ */
+ void onTransportClosed();
+
+ /**
+ * Called when an error occurs during normal Transport operations.
+ *
+ * @param cause the error that triggered this event.
+ */
+ void onTransportError(Throwable cause);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
new file mode 100644
index 0000000..3ffb8c8
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+/**
+ * Encapsulates all the TCP Transport options in one configuration object.
+ */
+public class NettyTransportOptions implements Cloneable {
+
+ public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
+ public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
+ public static final int DEFAULT_TRAFFIC_CLASS = 0;
+ public static final boolean DEFAULT_TCP_NO_DELAY = true;
+ public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
+ public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
+ public static final int DEFAULT_SO_TIMEOUT = -1;
+ public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
+ public static final int DEFAULT_TCP_PORT = 5672;
+
+ public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
+
+ private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
+ private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
+ private int trafficClass = DEFAULT_TRAFFIC_CLASS;
+ private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
+ private int soTimeout = DEFAULT_SO_TIMEOUT;
+ private int soLinger = DEFAULT_SO_LINGER;
+ private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
+ private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
+ private int defaultTcpPort = DEFAULT_TCP_PORT;
+
+ /**
+ * @return the currently set send buffer size in bytes.
+ */
+ public int getSendBufferSize() {
+ return sendBufferSize;
+ }
+
+ /**
+ * Sets the send buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param sendBufferSize the new send buffer size for the TCP Transport.
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setSendBufferSize(int sendBufferSize) {
+ if (sendBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.sendBufferSize = sendBufferSize;
+ }
+
+ /**
+ * @return the currently configured receive buffer size in bytes.
+ */
+ public int getReceiveBufferSize() {
+ return receiveBufferSize;
+ }
+
+ /**
+ * Sets the receive buffer size in bytes, the value must be greater than zero
+ * or an {@link IllegalArgumentException} will be thrown.
+ *
+ * @param receiveBufferSize the new receive buffer size for the TCP Transport.
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setReceiveBufferSize(int receiveBufferSize) {
+ if (receiveBufferSize <= 0) {
+ throw new IllegalArgumentException("The send buffer size must be > 0");
+ }
+
+ this.receiveBufferSize = receiveBufferSize;
+ }
+
+ /**
+ * @return the currently configured traffic class value.
+ */
+ public int getTrafficClass() {
+ return trafficClass;
+ }
+
+ /**
+ * Sets the traffic class value used by the TCP connection, valid
+ * range is between 0 and 255.
+ *
+ * @param trafficClass the new traffic class value.
+ * @throws IllegalArgumentException if the value given is not in the valid range.
+ */
+ public void setTrafficClass(int trafficClass) {
+ if (trafficClass < 0 || trafficClass > 255) {
+ throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
+ }
+
+ this.trafficClass = trafficClass;
+ }
+
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ public void setSoTimeout(int soTimeout) {
+ this.soTimeout = soTimeout;
+ }
+
+ public boolean isTcpNoDelay() {
+ return tcpNoDelay;
+ }
+
+ public void setTcpNoDelay(boolean tcpNoDelay) {
+ this.tcpNoDelay = tcpNoDelay;
+ }
+
+ public int getSoLinger() {
+ return soLinger;
+ }
+
+ public void setSoLinger(int soLinger) {
+ this.soLinger = soLinger;
+ }
+
+ public boolean isTcpKeepAlive() {
+ return tcpKeepAlive;
+ }
+
+ public void setTcpKeepAlive(boolean keepAlive) {
+ this.tcpKeepAlive = keepAlive;
+ }
+
+ public int getConnectTimeout() {
+ return connectTimeout;
+ }
+
+ public void setConnectTimeout(int connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public int getDefaultTcpPort() {
+ return defaultTcpPort;
+ }
+
+ public void setDefaultTcpPort(int defaultTcpPort) {
+ this.defaultTcpPort = defaultTcpPort;
+ }
+
+ @Override
+ public NettyTransportOptions clone() {
+ return copyOptions(new NettyTransportOptions());
+ }
+
+ protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
+ copy.setConnectTimeout(getConnectTimeout());
+ copy.setReceiveBufferSize(getReceiveBufferSize());
+ copy.setSendBufferSize(getSendBufferSize());
+ copy.setSoLinger(getSoLinger());
+ copy.setSoTimeout(getSoTimeout());
+ copy.setTcpKeepAlive(isTcpKeepAlive());
+ copy.setTcpNoDelay(isTcpNoDelay());
+ copy.setTrafficClass(getTrafficClass());
+
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
new file mode 100644
index 0000000..e256fbb
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Holds the defined SSL options for connections that operate over a secure
+ * transport. Options are read from the environment and can be overridden by
+ * specifying them on the connection URI.
+ */
+public class NettyTransportSslOptions extends NettyTransportOptions {
+
+ public static final String DEFAULT_STORE_TYPE = "jks";
+ public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
+ public static final boolean DEFAULT_TRUST_ALL = false;
+ public static final boolean DEFAULT_VERIFY_HOST = false;
+ public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
+ public static final int DEFAULT_SSL_PORT = 5671;
+
+ public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
+
+ private String keyStoreLocation;
+ private String keyStorePassword;
+ private String trustStoreLocation;
+ private String trustStorePassword;
+ private String storeType = DEFAULT_STORE_TYPE;
+ private String[] enabledCipherSuites;
+ private String[] disabledCipherSuites;
+ private String[] enabledProtocols;
+ private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]);
+ private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL;
+
+ private boolean trustAll = DEFAULT_TRUST_ALL;
+ private boolean verifyHost = DEFAULT_VERIFY_HOST;
+ private String keyAlias;
+ private int defaultSslPort = DEFAULT_SSL_PORT;
+
+ static {
+ INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
+ INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+ INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
+ INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+ }
+
+ /**
+ * @return the keyStoreLocation currently configured.
+ */
+ public String getKeyStoreLocation() {
+ return keyStoreLocation;
+ }
+
+ /**
+ * Sets the location on disk of the key store to use.
+ *
+ * @param keyStoreLocation the keyStoreLocation to use to create the key manager.
+ */
+ public void setKeyStoreLocation(String keyStoreLocation) {
+ this.keyStoreLocation = keyStoreLocation;
+ }
+
+ /**
+ * @return the keyStorePassword
+ */
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ /**
+ * @param keyStorePassword the keyStorePassword to set
+ */
+ public void setKeyStorePassword(String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ }
+
+ /**
+ * @return the trustStoreLocation
+ */
+ public String getTrustStoreLocation() {
+ return trustStoreLocation;
+ }
+
+ /**
+ * @param trustStoreLocation the trustStoreLocation to set
+ */
+ public void setTrustStoreLocation(String trustStoreLocation) {
+ this.trustStoreLocation = trustStoreLocation;
+ }
+
+ /**
+ * @return the trustStorePassword
+ */
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ /**
+ * @param trustStorePassword the trustStorePassword to set
+ */
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ }
+
+ /**
+ * @return the storeType
+ */
+ public String getStoreType() {
+ return storeType;
+ }
+
+ /**
+ * @param storeType the format that the store files are encoded in.
+ */
+ public void setStoreType(String storeType) {
+ this.storeType = storeType;
+ }
+
+ /**
+ * @return the enabledCipherSuites
+ */
+ public String[] getEnabledCipherSuites() {
+ return enabledCipherSuites;
+ }
+
+ /**
+ * @param enabledCipherSuites the enabledCipherSuites to set
+ */
+ public void setEnabledCipherSuites(String[] enabledCipherSuites) {
+ this.enabledCipherSuites = enabledCipherSuites;
+ }
+
+ /**
+ * @return the disabledCipherSuites
+ */
+ public String[] getDisabledCipherSuites() {
+ return disabledCipherSuites;
+ }
+
+ /**
+ * @param disabledCipherSuites the disabledCipherSuites to set
+ */
+ public void setDisabledCipherSuites(String[] disabledCipherSuites) {
+ this.disabledCipherSuites = disabledCipherSuites;
+ }
+
+ /**
+ * @return the enabledProtocols or null if the defaults should be used
+ */
+ public String[] getEnabledProtocols() {
+ return enabledProtocols;
+ }
+
+ /**
+ * The protocols to be set as enabled.
+ *
+ * @param enabledProtocols the enabled protocols to set, or null if the defaults should be used.
+ */
+ public void setEnabledProtocols(String[] enabledProtocols) {
+ this.enabledProtocols = enabledProtocols;
+ }
+
+ /**
+ * @return the protocols to disable or null if none should be
+ */
+ public String[] getDisabledProtocols() {
+ return disabledProtocols;
+ }
+
+ /**
+ * The protocols to be disable.
+ *
+ * @param disabledProtocols the protocols to disable, or null if none should be.
+ */
+ public void setDisabledProtocols(String[] disabledProtocols) {
+ this.disabledProtocols = disabledProtocols;
+ }
+
+ /**
+ * @return the context protocol to use
+ */
+ public String getContextProtocol() {
+ return contextProtocol;
+ }
+
+ /**
+ * The protocol value to use when creating an SSLContext via
+ * SSLContext.getInstance(protocol).
+ *
+ * @param contextProtocol the context protocol to use.
+ */
+ public void setContextProtocol(String contextProtocol) {
+ this.contextProtocol = contextProtocol;
+ }
+
+ /**
+ * @return the trustAll
+ */
+ public boolean isTrustAll() {
+ return trustAll;
+ }
+
+ /**
+ * @param trustAll the trustAll to set
+ */
+ public void setTrustAll(boolean trustAll) {
+ this.trustAll = trustAll;
+ }
+
+ /**
+ * @return the verifyHost
+ */
+ public boolean isVerifyHost() {
+ return verifyHost;
+ }
+
+ /**
+ * @param verifyHost the verifyHost to set
+ */
+ public void setVerifyHost(boolean verifyHost) {
+ this.verifyHost = verifyHost;
+ }
+
+ /**
+ * @return the key alias
+ */
+ public String getKeyAlias() {
+ return keyAlias;
+ }
+
+ /**
+ * @param keyAlias the key alias to use
+ */
+ public void setKeyAlias(String keyAlias) {
+ this.keyAlias = keyAlias;
+ }
+
+ public int getDefaultSslPort() {
+ return defaultSslPort;
+ }
+
+ public void setDefaultSslPort(int defaultSslPort) {
+ this.defaultSslPort = defaultSslPort;
+ }
+
+ @Override
+ public NettyTransportSslOptions clone() {
+ return copyOptions(new NettyTransportSslOptions());
+ }
+
+ protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) {
+ super.copyOptions(copy);
+
+ copy.setKeyStoreLocation(getKeyStoreLocation());
+ copy.setKeyStorePassword(getKeyStorePassword());
+ copy.setTrustStoreLocation(getTrustStoreLocation());
+ copy.setTrustStorePassword(getTrustStorePassword());
+ copy.setStoreType(getStoreType());
+ copy.setEnabledCipherSuites(getEnabledCipherSuites());
+ copy.setDisabledCipherSuites(getDisabledCipherSuites());
+ copy.setEnabledProtocols(getEnabledProtocols());
+ copy.setDisabledProtocols(getDisabledProtocols());
+ copy.setTrustAll(isTrustAll());
+ copy.setVerifyHost(isVerifyHost());
+ copy.setKeyAlias(getKeyAlias());
+ copy.setContextProtocol(getContextProtocol());
+ return copy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
new file mode 100644
index 0000000..51cedea
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509ExtendedKeyManager;
+import javax.net.ssl.X509TrustManager;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URI;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import io.netty.handler.ssl.SslHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Static class that provides various utility methods used by Transport implementations.
+ */
+public class NettyTransportSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
+
+ /**
+ * Creates a Netty SslHandler instance for use in Transports that require
+ * an SSL encoder / decoder.
+ *
+ * @param remote The URI of the remote peer that the SslHandler will be used against.
+ * @param options The SSL options object to build the SslHandler instance from.
+ * @return a new SslHandler that is configured from the given options.
+ * @throws Exception if an error occurs while creating the SslHandler instance.
+ */
+ public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
+ return new SslHandler(createSslEngine(remote, createSslContext(options), options));
+ }
+
+ /**
+ * Create a new SSLContext using the options specific in the given TransportSslOptions
+ * instance.
+ *
+ * @param options the configured options used to create the SSLContext.
+ * @return a new SSLContext instance.
+ * @throws Exception if an error occurs while creating the context.
+ */
+ public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
+ try {
+ String contextProtocol = options.getContextProtocol();
+ LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol);
+
+ SSLContext context = SSLContext.getInstance(contextProtocol);
+ KeyManager[] keyMgrs = loadKeyManagers(options);
+ TrustManager[] trustManagers = loadTrustManagers(options);
+
+ context.init(keyMgrs, trustManagers, new SecureRandom());
+ return context;
+ }
+ catch (Exception e) {
+ LOG.error("Failed to create SSLContext: {}", e, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Create a new SSLEngine instance in client mode from the given SSLContext and
+ * TransportSslOptions instances.
+ *
+ * @param context the SSLContext to use when creating the engine.
+ * @param options the TransportSslOptions to use to configure the new SSLEngine.
+ * @return a new SSLEngine instance in client mode.
+ * @throws Exception if an error occurs while creating the new SSLEngine.
+ */
+ public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
+ return createSslEngine(null, context, options);
+ }
+
+ /**
+ * Create a new SSLEngine instance in client mode from the given SSLContext and
+ * TransportSslOptions instances.
+ *
+ * @param remote the URI of the remote peer that will be used to initialize the engine, may be null if none should.
+ * @param context the SSLContext to use when creating the engine.
+ * @param options the TransportSslOptions to use to configure the new SSLEngine.
+ * @return a new SSLEngine instance in client mode.
+ * @throws Exception if an error occurs while creating the new SSLEngine.
+ */
+ public static SSLEngine createSslEngine(URI remote,
+ SSLContext context,
+ NettyTransportSslOptions options) throws Exception {
+ SSLEngine engine = null;
+ if (remote == null) {
+ engine = context.createSSLEngine();
+ }
+ else {
+ engine = context.createSSLEngine(remote.getHost(), remote.getPort());
+ }
+
+ engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
+ engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
+ engine.setUseClientMode(true);
+
+ if (options.isVerifyHost()) {
+ SSLParameters sslParameters = engine.getSSLParameters();
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ engine.setSSLParameters(sslParameters);
+ }
+
+ return engine;
+ }
+
+ private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) {
+ List<String> enabledProtocols = new ArrayList<>();
+
+ if (options.getEnabledProtocols() != null) {
+ List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols());
+ LOG.trace("Configured protocols from transport options: {}", configuredProtocols);
+ enabledProtocols.addAll(configuredProtocols);
+ }
+ else {
+ List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
+ LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols);
+ enabledProtocols.addAll(engineProtocols);
+ }
+
+ String[] disabledProtocols = options.getDisabledProtocols();
+ if (disabledProtocols != null) {
+ List<String> disabled = Arrays.asList(disabledProtocols);
+ LOG.trace("Disabled protocols: {}", disabled);
+ enabledProtocols.removeAll(disabled);
+ }
+
+ LOG.trace("Enabled protocols: {}", enabledProtocols);
+
+ return enabledProtocols.toArray(new String[0]);
+ }
+
+ private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) {
+ List<String> enabledCipherSuites = new ArrayList<>();
+
+ if (options.getEnabledCipherSuites() != null) {
+ List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites());
+ LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites);
+ enabledCipherSuites.addAll(configuredCipherSuites);
+ }
+ else {
+ List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
+ LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites);
+ enabledCipherSuites.addAll(engineCipherSuites);
+ }
+
+ String[] disabledCipherSuites = options.getDisabledCipherSuites();
+ if (disabledCipherSuites != null) {
+ List<String> disabled = Arrays.asList(disabledCipherSuites);
+ LOG.trace("Disabled cipher suites: {}", disabled);
+ enabledCipherSuites.removeAll(disabled);
+ }
+
+ LOG.trace("Enabled cipher suites: {}", enabledCipherSuites);
+
+ return enabledCipherSuites.toArray(new String[0]);
+ }
+
+ private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
+ if (options.isTrustAll()) {
+ return new TrustManager[]{createTrustAllTrustManager()};
+ }
+
+ if (options.getTrustStoreLocation() == null) {
+ return null;
+ }
+
+ TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+
+ String storeLocation = options.getTrustStoreLocation();
+ String storePassword = options.getTrustStorePassword();
+ String storeType = options.getStoreType();
+
+ LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType);
+
+ KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
+ fact.init(trustStore);
+
+ return fact.getTrustManagers();
+ }
+
+ private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception {
+ if (options.getKeyStoreLocation() == null) {
+ return null;
+ }
+
+ KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+
+ String storeLocation = options.getKeyStoreLocation();
+ String storePassword = options.getKeyStorePassword();
+ String storeType = options.getStoreType();
+ String alias = options.getKeyAlias();
+
+ LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
+
+ KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
+ fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
+
+ if (alias == null) {
+ return fact.getKeyManagers();
+ }
+ else {
+ validateAlias(keyStore, alias);
+ return wrapKeyManagers(alias, fact.getKeyManagers());
+ }
+ }
+
+ private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) {
+ KeyManager[] keyManagers = new KeyManager[origKeyManagers.length];
+ for (int i = 0; i < origKeyManagers.length; i++) {
+ KeyManager km = origKeyManagers[i];
+ if (km instanceof X509ExtendedKeyManager) {
+ km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km);
+ }
+
+ keyManagers[i] = km;
+ }
+
+ return keyManagers;
+ }
+
+ private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException {
+ if (!store.containsAlias(alias)) {
+ throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store");
+ }
+
+ if (!store.isKeyEntry(alias)) {
+ throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry");
+ }
+ }
+
+ private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
+ KeyStore store = KeyStore.getInstance(storeType);
+ try (InputStream in = new FileInputStream(new File(storePath));) {
+ store.load(in, password != null ? password.toCharArray() : null);
+ }
+
+ return store;
+ }
+
+ private static TrustManager createTrustAllTrustManager() {
+ return new X509TrustManager() {
+ @Override
+ public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return new X509Certificate[0];
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
new file mode 100644
index 0000000..b28f523
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -0,0 +1,472 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Transport for communicating over WebSockets
+ */
+public class NettyWSTransport implements NettyTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
+
+ private static final int QUIET_PERIOD = 20;
+ private static final int SHUTDOWN_TIMEOUT = 100;
+
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected NettyTransportListener listener;
+ protected NettyTransportOptions options;
+ protected final URI remote;
+ protected boolean secure;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private ChannelPromise handshakeFuture;
+ private IOException failureCause;
+ private Throwable pendingFailure;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation the URI that defines the remote resource to connect to.
+ * @param options the transport options used to configure the socket connection.
+ */
+ public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener the TransportListener that will receive events from this Transport.
+ * @param remoteLocation the URI that defines the remote resource to connect to.
+ * @param options the transport options used to configure the socket connection.
+ */
+ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ configureChannel(connectedChannel);
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future;
+ try {
+ future = bootstrap.connect(getRemoteHost(), getRemotePort());
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ handleConnected(future.channel());
+ }
+ else if (future.isCancelled()) {
+ connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+ }
+ else {
+ connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ future.sync();
+
+ // Now wait for WS protocol level handshake completion
+ handshakeFuture.await();
+ }
+ catch (InterruptedException ex) {
+ LOG.debug("Transport connection attempt was interrupted.");
+ Thread.interrupted();
+ failureCause = IOExceptionSupport.create(ex);
+ }
+
+ if (failureCause != null) {
+ // Close out any Netty resources now as they are no longer needed.
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ channel = null;
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ group = null;
+ }
+
+ throw failureCause;
+ }
+ else {
+ // Connected, allow any held async error to fire now and close the transport.
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ if (pendingFailure != null) {
+ channel.pipeline().fireExceptionCaught(pendingFailure);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public boolean isSSL() {
+ return secure;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ connected.set(false);
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf allocateSendBuffer(int size) throws IOException {
+ checkConnected();
+ return channel.alloc().ioBuffer(size, size);
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return;
+ }
+
+ LOG.trace("Attempted write of: {} bytes", length);
+
+ channel.writeAndFlush(new BinaryWebSocketFrame(output));
+ }
+
+ @Override
+ public NettyTransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(NettyTransportListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public NettyTransportOptions getTransportOptions() {
+ if (options == null) {
+ if (isSSL()) {
+ options = NettyTransportSslOptions.INSTANCE;
+ }
+ else {
+ options = NettyTransportOptions.INSTANCE;
+ }
+ }
+
+ return options;
+ }
+
+ @Override
+ public URI getRemoteLocation() {
+ return remote;
+ }
+
+ @Override
+ public Principal getLocalPrincipal() {
+ if (!isSSL()) {
+ throw new UnsupportedOperationException("Not connected to a secure channel");
+ }
+
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+ return sslHandler.engine().getSession().getLocalPrincipal();
+ }
+
+ //----- Internal implementation details, can be overridden as needed --//
+
+ protected String getRemoteHost() {
+ return remote.getHost();
+ }
+
+ protected int getRemotePort() {
+ int port = remote.getPort();
+
+ if (port <= 0) {
+ if (isSSL()) {
+ port = getSslOptions().getDefaultSslPort();
+ }
+ else {
+ port = getTransportOptions().getDefaultTcpPort();
+ }
+ }
+
+ return port;
+ }
+
+ protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ protected void configureChannel(final Channel channel) throws Exception {
+ if (isSSL()) {
+ SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ LOG.trace("SSL Handshake has completed: {}", channel);
+ connectionEstablished(channel);
+ }
+ else {
+ LOG.trace("SSL Handshake has failed: {}", channel);
+ connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ channel.pipeline().addLast(sslHandler);
+ }
+
+ channel.pipeline().addLast(new HttpClientCodec());
+ channel.pipeline().addLast(new HttpObjectAggregator(8192));
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+
+ protected void handleConnected(final Channel channel) throws Exception {
+ if (!isSSL()) {
+ connectionEstablished(channel);
+ }
+ }
+
+ //----- State change handlers and checks ---------------------------------//
+
+ /**
+ * Called when the transport has successfully connected and is ready for use.
+ */
+ protected void connectionEstablished(Channel connectedChannel) {
+ LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
+ channel = connectedChannel;
+ connected.set(true);
+ }
+
+ /**
+ * Called when the transport connection failed and an error should be returned.
+ *
+ * @param failedChannel The Channel instance that failed.
+ * @param cause An IOException that describes the cause of the failed connection.
+ */
+ protected void connectionFailed(Channel failedChannel, IOException cause) {
+ failureCause = IOExceptionSupport.create(cause);
+ channel = failedChannel;
+ connected.set(false);
+ handshakeFuture.setFailure(cause);
+ }
+
+ private NettyTransportSslOptions getSslOptions() {
+ return (NettyTransportSslOptions) getTransportOptions();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+
+ private final WebSocketClientHandshaker handshaker;
+
+ NettyTcpTransportHandler() {
+ handshaker = WebSocketClientHandshakerFactory.newHandshaker(remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext context) {
+ LOG.trace("Handler has become added! Channel is {}", context.channel());
+ handshakeFuture = context.newPromise();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has become active! Channel is {}", context.channel());
+ handshaker.handshake(context.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportClosed listener");
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
+ LOG.trace("Error Stack: ", cause);
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportError listener");
+ if (pendingFailure != null) {
+ listener.onTransportError(pendingFailure);
+ }
+ else {
+ listener.onTransportError(cause);
+ }
+ }
+ else {
+ // Hold the first failure for later dispatch if connect succeeds.
+ // This will then trigger disconnect using the first error reported.
+ if (pendingFailure != null) {
+ LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+ pendingFailure = cause;
+ }
+
+ if (!handshakeFuture.isDone()) {
+ handshakeFuture.setFailure(cause);
+ }
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
+ LOG.trace("New data read: incoming: {}", message);
+
+ Channel ch = ctx.channel();
+ if (!handshaker.isHandshakeComplete()) {
+ handshaker.finishHandshake(ch, (FullHttpResponse) message);
+ LOG.info("WebSocket Client connected! {}", ctx.channel());
+ handshakeFuture.setSuccess();
+ return;
+ }
+
+ // We shouldn't get this since we handle the handshake previously.
+ if (message instanceof FullHttpResponse) {
+ FullHttpResponse response = (FullHttpResponse) message;
+ throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
+ ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+ }
+
+ WebSocketFrame frame = (WebSocketFrame) message;
+ if (frame instanceof TextWebSocketFrame) {
+ TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
+ LOG.warn("WebSocket Client received message: " + textFrame.text());
+ ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
+ }
+ else if (frame instanceof BinaryWebSocketFrame) {
+ BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
+ LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+ listener.onData(binaryFrame.content());
+ }
+ else if (frame instanceof PongWebSocketFrame) {
+ LOG.trace("WebSocket Client received pong");
+ }
+ else if (frame instanceof CloseWebSocketFrame) {
+ LOG.trace("WebSocket Client received closing");
+ ch.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
new file mode 100644
index 0000000..c3c4286
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/PartialPooledByteBufAllocator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.UnpooledByteBufAllocator;
+
+/**
+ * A {@link ByteBufAllocator} which is partial pooled. Which means only direct
+ * {@link ByteBuf}s are pooled. The rest is unpooled.
+ *
+ */
+public class PartialPooledByteBufAllocator implements ByteBufAllocator {
+
+ private static final ByteBufAllocator POOLED = new PooledByteBufAllocator(false);
+ private static final ByteBufAllocator UNPOOLED = new UnpooledByteBufAllocator(false);
+
+ public static final PartialPooledByteBufAllocator INSTANCE = new PartialPooledByteBufAllocator();
+
+ private PartialPooledByteBufAllocator() {
+ }
+
+ @Override
+ public ByteBuf buffer() {
+ return UNPOOLED.heapBuffer();
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf buffer(int initialCapacity, int maxCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer() {
+ return UNPOOLED.heapBuffer();
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer() {
+ return UNPOOLED.heapBuffer();
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+ return UNPOOLED.heapBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public ByteBuf directBuffer() {
+ return POOLED.directBuffer();
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity) {
+ return POOLED.directBuffer(initialCapacity);
+ }
+
+ @Override
+ public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
+ return POOLED.directBuffer(initialCapacity, maxCapacity);
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer() {
+ return UNPOOLED.compositeHeapBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeBuffer(int maxNumComponents) {
+ return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer() {
+ return UNPOOLED.compositeHeapBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+ return UNPOOLED.compositeHeapBuffer(maxNumComponents);
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer() {
+ return POOLED.compositeDirectBuffer();
+ }
+
+ @Override
+ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
+ return POOLED.compositeDirectBuffer();
+ }
+
+ @Override
+ public boolean isDirectBufferPooled() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
new file mode 100644
index 0000000..42d6a0b
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.X509ExtendedKeyManager;
+import java.net.Socket;
+import java.security.Principal;
+import java.security.PrivateKey;
+import java.security.cert.X509Certificate;
+
+/**
+ * An X509ExtendedKeyManager wrapper which always chooses and only
+ * returns the given alias, and defers retrieval to the delegate
+ * key manager.
+ */
+public class X509AliasKeyManager extends X509ExtendedKeyManager {
+
+ private X509ExtendedKeyManager delegate;
+ private String alias;
+
+ public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException {
+ if (alias == null) {
+ throw new IllegalArgumentException("The given key alias must not be null.");
+ }
+
+ this.alias = alias;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {
+ return alias;
+ }
+
+ @Override
+ public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {
+ return alias;
+ }
+
+ @Override
+ public X509Certificate[] getCertificateChain(String alias) {
+ return delegate.getCertificateChain(alias);
+ }
+
+ @Override
+ public String[] getClientAliases(String keyType, Principal[] issuers) {
+ return new String[]{alias};
+ }
+
+ @Override
+ public PrivateKey getPrivateKey(String alias) {
+ return delegate.getPrivateKey(alias);
+ }
+
+ @Override
+ public String[] getServerAliases(String keyType, Principal[] issuers) {
+ return new String[]{alias};
+ }
+
+ @Override
+ public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
+ return alias;
+ }
+
+ @Override
+ public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
+ return alias;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
new file mode 100644
index 0000000..bb71746
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/AsyncResult.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+ /**
+ * If the operation fails this method is invoked with the Exception
+ * that caused the failure.
+ *
+ * @param result The error that resulted in this asynchronous operation failing.
+ */
+ void onFailure(Throwable result);
+
+ /**
+ * If the operation succeeds the resulting value produced is set to null and
+ * the waiting parties are signaled.
+ */
+ void onSuccess();
+
+ /**
+ * Returns true if the AsyncResult has completed. The task is considered complete
+ * regardless if it succeeded or failed.
+ *
+ * @return returns true if the asynchronous operation has completed.
+ */
+ boolean isComplete();
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
new file mode 100644
index 0000000..12d38fd
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFuture.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Asynchronous Client Future class.
+ */
+public class ClientFuture implements AsyncResult {
+
+ private final AtomicBoolean completer = new AtomicBoolean();
+ private final CountDownLatch latch = new CountDownLatch(1);
+ private final ClientFutureSynchronization synchronization;
+ private volatile Throwable error;
+
+ public ClientFuture() {
+ this(null);
+ }
+
+ public ClientFuture(ClientFutureSynchronization synchronization) {
+ this.synchronization = synchronization;
+ }
+
+ @Override
+ public boolean isComplete() {
+ return latch.getCount() == 0;
+ }
+
+ @Override
+ public void onFailure(Throwable result) {
+ if (completer.compareAndSet(false, true)) {
+ error = result;
+ if (synchronization != null) {
+ synchronization.onPendingFailure(error);
+ }
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onSuccess() {
+ if (completer.compareAndSet(false, true)) {
+ if (synchronization != null) {
+ synchronization.onPendingSuccess();
+ }
+ latch.countDown();
+ }
+ }
+
+ /**
+ * Timed wait for a response to a pending operation.
+ *
+ * @param amount The amount of time to wait before abandoning the wait.
+ * @param unit The unit to use for this wait period.
+ * @throws IOException if an error occurs while waiting for the response.
+ */
+ public void sync(long amount, TimeUnit unit) throws IOException {
+ try {
+ latch.await(amount, unit);
+ }
+ catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+
+ failOnError();
+ }
+
+ /**
+ * Waits for a response to some pending operation.
+ *
+ * @throws IOException if an error occurs while waiting for the response.
+ */
+ public void sync() throws IOException {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+
+ failOnError();
+ }
+
+ private void failOnError() throws IOException {
+ Throwable cause = error;
+ if (cause != null) {
+ throw IOExceptionSupport.create(cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
new file mode 100644
index 0000000..e279bc1
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/ClientFutureSynchronization.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+/**
+ * Synchronization callback interface used to execute state updates
+ * or similar tasks in the thread context where the associated
+ * ProviderFuture is managed.
+ */
+public interface ClientFutureSynchronization {
+
+ void onPendingSuccess();
+
+ void onPendingFailure(Throwable cause);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/df41a60e/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
new file mode 100644
index 0000000..70d88e6
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/util/IOExceptionSupport.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.util;
+
+import java.io.IOException;
+
+/**
+ * Used to make throwing IOException instances easier.
+ */
+public class IOExceptionSupport {
+
+ /**
+ * Checks the given cause to determine if it's already an IOException type and
+ * if not creates a new IOException to wrap it.
+ *
+ * @param cause The initiating exception that should be cast or wrapped.
+ * @return an IOException instance.
+ */
+ public static IOException create(Throwable cause) {
+ if (cause instanceof IOException) {
+ return (IOException) cause;
+ }
+
+ String message = cause.getMessage();
+ if (message == null || message.length() == 0) {
+ message = cause.toString();
+ }
+
+ return new IOException(message, cause);
+ }
+}