You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/13 22:02:43 UTC
[4/8] activemq-artemis git commit: ARTEMIS-1511 Refactor AMQP
Transport for use with other test clients
ARTEMIS-1511 Refactor AMQP Transport for use with other test clients
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5211afdf
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5211afdf
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5211afdf
Branch: refs/heads/master
Commit: 5211afdf866fbcb5b538b2d5e2670dd5df385423
Parents: 63b156e
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Nov 10 12:31:29 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 13 16:55:47 2017 -0500
----------------------------------------------------------------------
.../transport/amqp/client/AmqpClient.java | 4 +-
.../transport/amqp/client/AmqpConnection.java | 7 +-
.../client/transport/NettyTcpTransport.java | 460 -------------------
.../amqp/client/transport/NettyTransport.java | 56 ---
.../client/transport/NettyTransportFactory.java | 83 ----
.../transport/NettyTransportListener.java | 48 --
.../client/transport/NettyTransportOptions.java | 208 ---------
.../transport/NettyTransportSslOptions.java | 302 ------------
.../client/transport/NettyTransportSupport.java | 304 ------------
.../amqp/client/transport/NettyWSTransport.java | 171 -------
.../client/transport/X509AliasKeyManager.java | 86 ----
.../transport/netty/NettyTcpTransport.java | 460 +++++++++++++++++++
.../transport/netty/NettyTransport.java | 57 +++
.../transport/netty/NettyTransportFactory.java | 82 ++++
.../transport/netty/NettyTransportListener.java | 48 ++
.../transport/netty/NettyTransportOptions.java | 219 +++++++++
.../netty/NettyTransportSslOptions.java | 302 ++++++++++++
.../transport/netty/NettyTransportSupport.java | 304 ++++++++++++
.../transport/netty/NettyWSTransport.java | 172 +++++++
.../transport/netty/X509AliasKeyManager.java | 86 ++++
.../impl/netty/NettyHandshakeTimeoutTest.java | 6 +-
21 files changed, 1739 insertions(+), 1726 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index fddaf9d..d35d0ab 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -21,8 +21,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.activemq.transport.netty.NettyTransport;
+import org.apache.activemq.transport.netty.NettyTransportFactory;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index 2fc720a..01e2288 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -33,8 +33,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
+import org.apache.activemq.transport.netty.NettyTransport;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
+import org.apache.activemq.transport.netty.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
import org.apache.activemq.transport.amqp.client.util.IdGenerator;
@@ -80,7 +81,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
- private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
+ private final NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();
private final String username;
@@ -109,7 +110,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private boolean trace;
private boolean noContainerID = false;
- public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
+ public AmqpConnection(NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
deleted file mode 100644
index 7ce3bb9..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.security.Principal;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.logging.LoggingHandler;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
-/**
- * TCP based transport that uses Netty as the underlying IO layer.
- */
-public class NettyTcpTransport implements NettyTransport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
-
- private static final int SHUTDOWN_TIMEOUT = 100;
- public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
-
- protected Bootstrap bootstrap;
- protected EventLoopGroup group;
- protected Channel channel;
- protected NettyTransportListener listener;
- protected final NettyTransportOptions options;
- protected final URI remote;
- protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
-
- private final AtomicBoolean connected = new AtomicBoolean();
- private final AtomicBoolean closed = new AtomicBoolean();
- private final CountDownLatch connectLatch = new CountDownLatch(1);
- private volatile IOException failureCause;
-
- /**
- * Create a new transport instance
- *
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
- this(null, remoteLocation, options);
- }
-
- /**
- * Create a new transport instance
- *
- * @param listener
- * the TransportListener that will receive events from this Transport.
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
- if (options == null) {
- throw new IllegalArgumentException("Transport Options cannot be null");
- }
-
- if (remoteLocation == null) {
- throw new IllegalArgumentException("Transport remote location cannot be null");
- }
-
- this.options = options;
- this.listener = listener;
- this.remote = remoteLocation;
- }
-
- @Override
- public void connect() throws IOException {
-
- if (listener == null) {
- throw new IllegalStateException("A transport listener must be set before connection attempts.");
- }
-
- final SslHandler sslHandler;
- if (isSSL()) {
- try {
- sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
- } catch (Exception ex) {
- // TODO: can we stop it throwing Exception?
- throw IOExceptionSupport.create(ex);
- }
- } else {
- sslHandler = null;
- }
-
- group = new NioEventLoopGroup(1);
-
- bootstrap = new Bootstrap();
- bootstrap.group(group);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<Channel>() {
- @Override
- public void initChannel(Channel connectedChannel) throws Exception {
- configureChannel(connectedChannel, sslHandler);
- }
- });
-
- configureNetty(bootstrap, getTransportOptions());
-
- ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
- future.addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- handleException(future.channel(), IOExceptionSupport.create(future.cause()));
- }
- }
- });
-
- try {
- connectLatch.await();
- } catch (InterruptedException ex) {
- LOG.debug("Transport connection was interrupted.");
- Thread.interrupted();
- failureCause = IOExceptionSupport.create(ex);
- }
-
- if (failureCause != null) {
- // Close out any Netty resources now as they are no longer needed.
- if (channel != null) {
- channel.close().syncUninterruptibly();
- channel = null;
- }
- if (group != null) {
- Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
- LOG.trace("Channel group shutdown failed to complete in allotted time");
- }
- group = null;
- }
-
- throw failureCause;
- } else {
- // Connected, allow any held async error to fire now and close the transport.
- channel.eventLoop().execute(new Runnable() {
-
- @Override
- public void run() {
- if (failureCause != null) {
- channel.pipeline().fireExceptionCaught(failureCause);
- }
- }
- });
- }
- }
-
- @Override
- public boolean isConnected() {
- return connected.get();
- }
-
- @Override
- public boolean isSSL() {
- return options.isSSL();
- }
-
- @Override
- public void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- connected.set(false);
- try {
- if (channel != null) {
- channel.close().syncUninterruptibly();
- }
- } finally {
- if (group != null) {
- Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
- LOG.trace("Channel group shutdown failed to complete in allotted time");
- }
- }
- }
- }
- }
-
- @Override
- public ByteBuf allocateSendBuffer(int size) throws IOException {
- checkConnected();
- return channel.alloc().ioBuffer(size, size);
- }
-
- @Override
- public void send(ByteBuf output) throws IOException {
- checkConnected();
- int length = output.readableBytes();
- if (length == 0) {
- return;
- }
-
- LOG.trace("Attempted write of: {} bytes", length);
-
- channel.writeAndFlush(output);
- }
-
- @Override
- public NettyTransportListener getTransportListener() {
- return listener;
- }
-
- @Override
- public void setTransportListener(NettyTransportListener listener) {
- this.listener = listener;
- }
-
- @Override
- public NettyTransportOptions getTransportOptions() {
- return options;
- }
-
- @Override
- public URI getRemoteLocation() {
- return remote;
- }
-
- @Override
- public Principal getLocalPrincipal() {
- Principal result = null;
-
- if (isSSL()) {
- SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
- result = sslHandler.engine().getSession().getLocalPrincipal();
- }
-
- return result;
- }
-
- @Override
- public void setMaxFrameSize(int maxFrameSize) {
- if (connected.get()) {
- throw new IllegalStateException("Cannot change Max Frame Size while connected.");
- }
-
- this.maxFrameSize = maxFrameSize;
- }
-
- @Override
- public int getMaxFrameSize() {
- return maxFrameSize;
- }
-
- // ----- Internal implementation details, can be overridden as needed -----//
-
- protected String getRemoteHost() {
- return remote.getHost();
- }
-
- protected int getRemotePort() {
- if (remote.getPort() != -1) {
- return remote.getPort();
- } else {
- return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
- }
- }
-
- protected void addAdditionalHandlers(ChannelPipeline pipeline) {
-
- }
-
- protected ChannelInboundHandlerAdapter createChannelHandler() {
- return new NettyTcpTransportHandler();
- }
-
- // ----- Event Handlers which can be overridden in subclasses -------------//
-
- protected void handleConnected(Channel channel) throws Exception {
- LOG.trace("Channel has become active! Channel is {}", channel);
- connectionEstablished(channel);
- }
-
- protected void handleChannelInactive(Channel channel) throws Exception {
- LOG.trace("Channel has gone inactive! Channel is {}", channel);
- if (connected.compareAndSet(true, false) && !closed.get()) {
- LOG.trace("Firing onTransportClosed listener");
- listener.onTransportClosed();
- }
- }
-
- protected void handleException(Channel channel, Throwable cause) throws Exception {
- LOG.trace("Exception on channel! Channel is {}", channel);
- if (connected.compareAndSet(true, false) && !closed.get()) {
- LOG.trace("Firing onTransportError listener");
- if (failureCause != null) {
- listener.onTransportError(failureCause);
- } else {
- listener.onTransportError(cause);
- }
- } else {
- // Hold the first failure for later dispatch if connect succeeds.
- // This will then trigger disconnect using the first error reported.
- if (failureCause == null) {
- LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
- failureCause = IOExceptionSupport.create(cause);
- }
-
- connectionFailed(channel, failureCause);
- }
- }
-
- // ----- State change handlers and checks ---------------------------------//
-
- protected final void checkConnected() throws IOException {
- if (!connected.get()) {
- throw new IOException("Cannot send to a non-connected transport.");
- }
- }
-
- /*
- * Called when the transport has successfully connected and is ready for use.
- */
- private void connectionEstablished(Channel connectedChannel) {
- channel = connectedChannel;
- connected.set(true);
- connectLatch.countDown();
- }
-
- /*
- * Called when the transport connection failed and an error should be returned.
- */
- private void connectionFailed(Channel failedChannel, IOException cause) {
- failureCause = cause;
- channel = failedChannel;
- connected.set(false);
- connectLatch.countDown();
- }
-
- private NettyTransportSslOptions getSslOptions() {
- return (NettyTransportSslOptions) getTransportOptions();
- }
-
- private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
- bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
- bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
- bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
-
- if (options.getSendBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
- }
-
- if (options.getReceiveBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
- }
-
- if (options.getTrafficClass() != -1) {
- bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
- }
- }
-
- private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
- if (isSSL()) {
- channel.pipeline().addLast(sslHandler);
- }
-
- if (getTransportOptions().isTraceBytes()) {
- channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
- }
-
- addAdditionalHandlers(channel.pipeline());
-
- channel.pipeline().addLast(createChannelHandler());
- }
-
- // ----- Handle connection events -----------------------------------------//
-
- protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
-
- @Override
- public void channelRegistered(ChannelHandlerContext context) throws Exception {
- channel = context.channel();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext context) throws Exception {
- // In the Secure case we need to let the handshake complete before we
- // trigger the connected event.
- if (!isSSL()) {
- handleConnected(context.channel());
- } else {
- SslHandler sslHandler = context.pipeline().get(SslHandler.class);
- sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
- @Override
- public void operationComplete(Future<Channel> future) throws Exception {
- if (future.isSuccess()) {
- LOG.trace("SSL Handshake has completed: {}", channel);
- handleConnected(channel);
- } else {
- LOG.trace("SSL Handshake has failed: {}", channel);
- handleException(channel, future.cause());
- }
- }
- });
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
- handleChannelInactive(context.channel());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
- handleException(context.channel(), cause);
- }
- }
-
- // ----- Handle Binary data from connection -------------------------------//
-
- protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
- LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
- listener.onData(buffer);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
deleted file mode 100644
index 4d5a389..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.security.Principal;
-
-import io.netty.buffer.ByteBuf;
-
-/**
- * Base for all Netty based Transports in this client.
- */
-public interface NettyTransport {
-
- void connect() throws IOException;
-
- boolean isConnected();
-
- boolean isSSL();
-
- void close() throws IOException;
-
- ByteBuf allocateSendBuffer(int size) throws IOException;
-
- void send(ByteBuf output) throws IOException;
-
- NettyTransportListener getTransportListener();
-
- void setTransportListener(NettyTransportListener listener);
-
- NettyTransportOptions getTransportOptions();
-
- URI getRemoteLocation();
-
- Principal getLocalPrincipal();
-
- void setMaxFrameSize(int maxFrameSize);
-
- int getMaxFrameSize();
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
deleted file mode 100644
index 30b2e21..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.net.URI;
-import java.util.Map;
-
-import org.apache.activemq.transport.amqp.client.util.PropertyUtil;
-
-/**
- * Factory for creating the Netty based TCP Transport.
- */
-public final class NettyTransportFactory {
-
- private NettyTransportFactory() {
- }
-
- /**
- * Creates an instance of the given Transport and configures it using the properties set on
- * the given remote broker URI.
- *
- * @param remoteURI
- * The URI used to connect to a remote Peer.
- *
- * @return a new Transport instance.
- *
- * @throws Exception
- * if an error occurs while creating the Transport instance.
- */
- public static NettyTransport createTransport(URI remoteURI) throws Exception {
- Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
- Map<String, String> transportURIOptions = PropertyUtil.filterProperties(map, "transport.");
- NettyTransportOptions transportOptions = null;
-
- remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
-
- if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
- transportOptions = NettyTransportOptions.INSTANCE.clone();
- } else {
- transportOptions = NettyTransportSslOptions.INSTANCE.clone();
- }
-
- Map<String, String> unused = PropertyUtil.setProperties(transportOptions, transportURIOptions);
- if (!unused.isEmpty()) {
- String msg = " Not all transport options could be set on the TCP based" +
- " Transport. Check the options are spelled correctly." +
- " Unused parameters=[" + unused + "]." +
- " This provider instance cannot be started.";
- throw new IllegalArgumentException(msg);
- }
-
- NettyTransport result = null;
-
- switch (remoteURI.getScheme().toLowerCase()) {
- case "tcp":
- case "ssl":
- result = new NettyTcpTransport(remoteURI, transportOptions);
- break;
- case "ws":
- case "wss":
- result = new NettyWSTransport(remoteURI, transportOptions);
- break;
- default:
- throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
deleted file mode 100644
index 0163517..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportListener.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import io.netty.buffer.ByteBuf;
-
-/**
- * Listener interface that should be implemented by users of the various QpidJMS Transport
- * classes.
- */
-public interface NettyTransportListener {
-
- /**
- * Called when new incoming data has become available.
- *
- * @param incoming
- * the next incoming packet of data.
- */
- void onData(ByteBuf incoming);
-
- /**
- * Called if the connection state becomes closed.
- */
- void onTransportClosed();
-
- /**
- * Called when an error occurs during normal Transport operations.
- *
- * @param cause
- * the error that triggered this event.
- */
- void onTransportError(Throwable cause);
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
deleted file mode 100644
index c5022c1..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportOptions.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-/**
- * Encapsulates all the TCP Transport options in one configuration object.
- */
-public class NettyTransportOptions implements Cloneable {
-
- public static final int DEFAULT_SEND_BUFFER_SIZE = 64 * 1024;
- public static final int DEFAULT_RECEIVE_BUFFER_SIZE = DEFAULT_SEND_BUFFER_SIZE;
- public static final int DEFAULT_TRAFFIC_CLASS = 0;
- public static final boolean DEFAULT_TCP_NO_DELAY = true;
- public static final boolean DEFAULT_TCP_KEEP_ALIVE = false;
- public static final int DEFAULT_SO_LINGER = Integer.MIN_VALUE;
- public static final int DEFAULT_SO_TIMEOUT = -1;
- public static final int DEFAULT_CONNECT_TIMEOUT = 60000;
- public static final int DEFAULT_TCP_PORT = 5672;
- public static final boolean DEFAULT_TRACE_BYTES = false;
-
- public static final NettyTransportOptions INSTANCE = new NettyTransportOptions();
-
- private int sendBufferSize = DEFAULT_SEND_BUFFER_SIZE;
- private int receiveBufferSize = DEFAULT_RECEIVE_BUFFER_SIZE;
- private int trafficClass = DEFAULT_TRAFFIC_CLASS;
- private int connectTimeout = DEFAULT_CONNECT_TIMEOUT;
- private int soTimeout = DEFAULT_SO_TIMEOUT;
- private int soLinger = DEFAULT_SO_LINGER;
- private boolean tcpKeepAlive = DEFAULT_TCP_KEEP_ALIVE;
- private boolean tcpNoDelay = DEFAULT_TCP_NO_DELAY;
- private int defaultTcpPort = DEFAULT_TCP_PORT;
- private boolean traceBytes = DEFAULT_TRACE_BYTES;
-
- /**
- * @return the currently set send buffer size in bytes.
- */
- public int getSendBufferSize() {
- return sendBufferSize;
- }
-
- /**
- * Sets the send buffer size in bytes, the value must be greater than zero or an
- * {@link IllegalArgumentException} will be thrown.
- *
- * @param sendBufferSize
- * the new send buffer size for the TCP Transport.
- *
- * @throws IllegalArgumentException
- * if the value given is not in the valid range.
- */
- public void setSendBufferSize(int sendBufferSize) {
- if (sendBufferSize <= 0) {
- throw new IllegalArgumentException("The send buffer size must be > 0");
- }
-
- this.sendBufferSize = sendBufferSize;
- }
-
- /**
- * @return the currently configured receive buffer size in bytes.
- */
- public int getReceiveBufferSize() {
- return receiveBufferSize;
- }
-
- /**
- * Sets the receive buffer size in bytes, the value must be greater than zero or an
- * {@link IllegalArgumentException} will be thrown.
- *
- * @param receiveBufferSize
- * the new receive buffer size for the TCP Transport.
- *
- * @throws IllegalArgumentException
- * if the value given is not in the valid range.
- */
- public void setReceiveBufferSize(int receiveBufferSize) {
- if (receiveBufferSize <= 0) {
- throw new IllegalArgumentException("The send buffer size must be > 0");
- }
-
- this.receiveBufferSize = receiveBufferSize;
- }
-
- /**
- * @return the currently configured traffic class value.
- */
- public int getTrafficClass() {
- return trafficClass;
- }
-
- /**
- * Sets the traffic class value used by the TCP connection, valid range is between 0 and 255.
- *
- * @param trafficClass
- * the new traffic class value.
- *
- * @throws IllegalArgumentException
- * if the value given is not in the valid range.
- */
- public void setTrafficClass(int trafficClass) {
- if (trafficClass < 0 || trafficClass > 255) {
- throw new IllegalArgumentException("Traffic class must be in the range [0..255]");
- }
-
- this.trafficClass = trafficClass;
- }
-
- public int getSoTimeout() {
- return soTimeout;
- }
-
- public void setSoTimeout(int soTimeout) {
- this.soTimeout = soTimeout;
- }
-
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
-
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
-
- public int getSoLinger() {
- return soLinger;
- }
-
- public void setSoLinger(int soLinger) {
- this.soLinger = soLinger;
- }
-
- public boolean isTcpKeepAlive() {
- return tcpKeepAlive;
- }
-
- public void setTcpKeepAlive(boolean keepAlive) {
- this.tcpKeepAlive = keepAlive;
- }
-
- public int getConnectTimeout() {
- return connectTimeout;
- }
-
- public void setConnectTimeout(int connectTimeout) {
- this.connectTimeout = connectTimeout;
- }
-
- public int getDefaultTcpPort() {
- return defaultTcpPort;
- }
-
- public void setDefaultTcpPort(int defaultTcpPort) {
- this.defaultTcpPort = defaultTcpPort;
- }
-
- /**
- * @return true if the transport should enable byte tracing
- */
- public boolean isTraceBytes() {
- return traceBytes;
- }
-
- /**
- * Determines if the transport should add a logger for bytes in / out
- *
- * @param traceBytes
- * should the transport log the bytes in and out.
- */
- public void setTraceBytes(boolean traceBytes) {
- this.traceBytes = traceBytes;
- }
-
- public boolean isSSL() {
- return false;
- }
-
- @Override
- public NettyTransportOptions clone() {
- return copyOptions(new NettyTransportOptions());
- }
-
- protected NettyTransportOptions copyOptions(NettyTransportOptions copy) {
- copy.setConnectTimeout(getConnectTimeout());
- copy.setReceiveBufferSize(getReceiveBufferSize());
- copy.setSendBufferSize(getSendBufferSize());
- copy.setSoLinger(getSoLinger());
- copy.setSoTimeout(getSoTimeout());
- copy.setTcpKeepAlive(isTcpKeepAlive());
- copy.setTcpNoDelay(isTcpNoDelay());
- copy.setTrafficClass(getTrafficClass());
-
- return copy;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
deleted file mode 100644
index 3289fce..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Holds the defined SSL options for connections that operate over a secure transport. Options
- * are read from the environment and can be overridden by specifying them on the connection URI.
- */
-public class NettyTransportSslOptions extends NettyTransportOptions {
-
- public static final String DEFAULT_STORE_TYPE = "jks";
- public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
- public static final boolean DEFAULT_TRUST_ALL = false;
- public static final boolean DEFAULT_VERIFY_HOST = false;
- public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[] {"SSLv2Hello", "SSLv3"}));
- public static final int DEFAULT_SSL_PORT = 5671;
-
- public static final NettyTransportSslOptions INSTANCE = new NettyTransportSslOptions();
-
- private String keyStoreLocation;
- private String keyStorePassword;
- private String trustStoreLocation;
- private String trustStorePassword;
- private String storeType = DEFAULT_STORE_TYPE;
- private String[] enabledCipherSuites;
- private String[] disabledCipherSuites;
- private String[] enabledProtocols;
- private String[] disabledProtocols = DEFAULT_DISABLED_PROTOCOLS.toArray(new String[0]);
- private String contextProtocol = DEFAULT_CONTEXT_PROTOCOL;
-
- private boolean trustAll = DEFAULT_TRUST_ALL;
- private boolean verifyHost = DEFAULT_VERIFY_HOST;
- private String keyAlias;
- private int defaultSslPort = DEFAULT_SSL_PORT;
-
- static {
- INSTANCE.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
- INSTANCE.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
- INSTANCE.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
- INSTANCE.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
- }
-
- /**
- * @return the keyStoreLocation currently configured.
- */
- public String getKeyStoreLocation() {
- return keyStoreLocation;
- }
-
- /**
- * Sets the location on disk of the key store to use.
- *
- * @param keyStoreLocation
- * the keyStoreLocation to use to create the key manager.
- */
- public void setKeyStoreLocation(String keyStoreLocation) {
- this.keyStoreLocation = keyStoreLocation;
- }
-
- /**
- * @return the keyStorePassword
- */
- public String getKeyStorePassword() {
- return keyStorePassword;
- }
-
- /**
- * @param keyStorePassword
- * the keyStorePassword to set
- */
- public void setKeyStorePassword(String keyStorePassword) {
- this.keyStorePassword = keyStorePassword;
- }
-
- /**
- * @return the trustStoreLocation
- */
- public String getTrustStoreLocation() {
- return trustStoreLocation;
- }
-
- /**
- * @param trustStoreLocation
- * the trustStoreLocation to set
- */
- public void setTrustStoreLocation(String trustStoreLocation) {
- this.trustStoreLocation = trustStoreLocation;
- }
-
- /**
- * @return the trustStorePassword
- */
- public String getTrustStorePassword() {
- return trustStorePassword;
- }
-
- /**
- * @param trustStorePassword
- * the trustStorePassword to set
- */
- public void setTrustStorePassword(String trustStorePassword) {
- this.trustStorePassword = trustStorePassword;
- }
-
- /**
- * @return the storeType
- */
- public String getStoreType() {
- return storeType;
- }
-
- /**
- * @param storeType
- * the format that the store files are encoded in.
- */
- public void setStoreType(String storeType) {
- this.storeType = storeType;
- }
-
- /**
- * @return the enabledCipherSuites
- */
- public String[] getEnabledCipherSuites() {
- return enabledCipherSuites;
- }
-
- /**
- * @param enabledCipherSuites
- * the enabledCipherSuites to set
- */
- public void setEnabledCipherSuites(String[] enabledCipherSuites) {
- this.enabledCipherSuites = enabledCipherSuites;
- }
-
- /**
- * @return the disabledCipherSuites
- */
- public String[] getDisabledCipherSuites() {
- return disabledCipherSuites;
- }
-
- /**
- * @param disabledCipherSuites
- * the disabledCipherSuites to set
- */
- public void setDisabledCipherSuites(String[] disabledCipherSuites) {
- this.disabledCipherSuites = disabledCipherSuites;
- }
-
- /**
- * @return the enabledProtocols or null if the defaults should be used
- */
- public String[] getEnabledProtocols() {
- return enabledProtocols;
- }
-
- /**
- * The protocols to be set as enabled.
- *
- * @param enabledProtocols
- * the enabled protocols to set, or null if the defaults should be used.
- */
- public void setEnabledProtocols(String[] enabledProtocols) {
- this.enabledProtocols = enabledProtocols;
- }
-
- /**
- *
- * @return the protocols to disable or null if none should be
- */
- public String[] getDisabledProtocols() {
- return disabledProtocols;
- }
-
- /**
- * The protocols to be disable.
- *
- * @param disabledProtocols
- * the protocols to disable, or null if none should be.
- */
- public void setDisabledProtocols(String[] disabledProtocols) {
- this.disabledProtocols = disabledProtocols;
- }
-
- /**
- * @return the context protocol to use
- */
- public String getContextProtocol() {
- return contextProtocol;
- }
-
- /**
- * The protocol value to use when creating an SSLContext via
- * SSLContext.getInstance(protocol).
- *
- * @param contextProtocol
- * the context protocol to use.
- */
- public void setContextProtocol(String contextProtocol) {
- this.contextProtocol = contextProtocol;
- }
-
- /**
- * @return the trustAll
- */
- public boolean isTrustAll() {
- return trustAll;
- }
-
- /**
- * @param trustAll
- * the trustAll to set
- */
- public void setTrustAll(boolean trustAll) {
- this.trustAll = trustAll;
- }
-
- /**
- * @return the verifyHost
- */
- public boolean isVerifyHost() {
- return verifyHost;
- }
-
- /**
- * @param verifyHost
- * the verifyHost to set
- */
- public void setVerifyHost(boolean verifyHost) {
- this.verifyHost = verifyHost;
- }
-
- /**
- * @return the key alias
- */
- public String getKeyAlias() {
- return keyAlias;
- }
-
- /**
- * @param keyAlias
- * the key alias to use
- */
- public void setKeyAlias(String keyAlias) {
- this.keyAlias = keyAlias;
- }
-
- public int getDefaultSslPort() {
- return defaultSslPort;
- }
-
- public void setDefaultSslPort(int defaultSslPort) {
- this.defaultSslPort = defaultSslPort;
- }
-
- @Override
- public boolean isSSL() {
- return true;
- }
-
- @Override
- public NettyTransportSslOptions clone() {
- return copyOptions(new NettyTransportSslOptions());
- }
-
- protected NettyTransportSslOptions copyOptions(NettyTransportSslOptions copy) {
- super.copyOptions(copy);
-
- copy.setKeyStoreLocation(getKeyStoreLocation());
- copy.setKeyStorePassword(getKeyStorePassword());
- copy.setTrustStoreLocation(getTrustStoreLocation());
- copy.setTrustStorePassword(getTrustStorePassword());
- copy.setStoreType(getStoreType());
- copy.setEnabledCipherSuites(getEnabledCipherSuites());
- copy.setDisabledCipherSuites(getDisabledCipherSuites());
- copy.setEnabledProtocols(getEnabledProtocols());
- copy.setDisabledProtocols(getDisabledProtocols());
- copy.setTrustAll(isTrustAll());
- copy.setVerifyHost(isVerifyHost());
- copy.setKeyAlias(getKeyAlias());
- copy.setContextProtocol(getContextProtocol());
- return copy;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
deleted file mode 100644
index d41c669..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSupport.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URI;
-import java.security.KeyStore;
-import java.security.KeyStoreException;
-import java.security.SecureRandom;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManager;
-import javax.net.ssl.TrustManagerFactory;
-import javax.net.ssl.X509ExtendedKeyManager;
-import javax.net.ssl.X509TrustManager;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.handler.ssl.SslHandler;
-
-/**
- * Static class that provides various utility methods used by Transport implementations.
- */
-public class NettyTransportSupport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyTransportSupport.class);
-
- /**
- * Creates a Netty SslHandler instance for use in Transports that require an SSL encoder /
- * decoder.
- *
- * @param remote
- * The URI of the remote peer that the SslHandler will be used against.
- * @param options
- * The SSL options object to build the SslHandler instance from.
- *
- * @return a new SslHandler that is configured from the given options.
- *
- * @throws Exception
- * if an error occurs while creating the SslHandler instance.
- */
- public static SslHandler createSslHandler(URI remote, NettyTransportSslOptions options) throws Exception {
- return new SslHandler(createSslEngine(remote, createSslContext(options), options));
- }
-
- /**
- * Create a new SSLContext using the options specific in the given TransportSslOptions
- * instance.
- *
- * @param options
- * the configured options used to create the SSLContext.
- *
- * @return a new SSLContext instance.
- *
- * @throws Exception
- * if an error occurs while creating the context.
- */
- public static SSLContext createSslContext(NettyTransportSslOptions options) throws Exception {
- try {
- String contextProtocol = options.getContextProtocol();
- LOG.trace("Getting SSLContext instance using protocol: {}", contextProtocol);
-
- SSLContext context = SSLContext.getInstance(contextProtocol);
- KeyManager[] keyMgrs = loadKeyManagers(options);
- TrustManager[] trustManagers = loadTrustManagers(options);
-
- context.init(keyMgrs, trustManagers, new SecureRandom());
- return context;
- } catch (Exception e) {
- LOG.error("Failed to create SSLContext: {}", e, e);
- throw e;
- }
- }
-
- /**
- * Create a new SSLEngine instance in client mode from the given SSLContext and
- * TransportSslOptions instances.
- *
- * @param context
- * the SSLContext to use when creating the engine.
- * @param options
- * the TransportSslOptions to use to configure the new SSLEngine.
- *
- * @return a new SSLEngine instance in client mode.
- *
- * @throws Exception
- * if an error occurs while creating the new SSLEngine.
- */
- public static SSLEngine createSslEngine(SSLContext context, NettyTransportSslOptions options) throws Exception {
- return createSslEngine(null, context, options);
- }
-
- /**
- * Create a new SSLEngine instance in client mode from the given SSLContext and
- * TransportSslOptions instances.
- *
- * @param remote
- * the URI of the remote peer that will be used to initialize the engine, may be null
- * if none should.
- * @param context
- * the SSLContext to use when creating the engine.
- * @param options
- * the TransportSslOptions to use to configure the new SSLEngine.
- *
- * @return a new SSLEngine instance in client mode.
- *
- * @throws Exception
- * if an error occurs while creating the new SSLEngine.
- */
- public static SSLEngine createSslEngine(URI remote, SSLContext context, NettyTransportSslOptions options) throws Exception {
- SSLEngine engine = null;
- if (remote == null) {
- engine = context.createSSLEngine();
- } else {
- engine = context.createSSLEngine(remote.getHost(), remote.getPort());
- }
-
- engine.setEnabledProtocols(buildEnabledProtocols(engine, options));
- engine.setEnabledCipherSuites(buildEnabledCipherSuites(engine, options));
- engine.setUseClientMode(true);
-
- if (options.isVerifyHost()) {
- SSLParameters sslParameters = engine.getSSLParameters();
- sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
- engine.setSSLParameters(sslParameters);
- }
-
- return engine;
- }
-
- private static String[] buildEnabledProtocols(SSLEngine engine, NettyTransportSslOptions options) {
- List<String> enabledProtocols = new ArrayList<>();
-
- if (options.getEnabledProtocols() != null) {
- List<String> configuredProtocols = Arrays.asList(options.getEnabledProtocols());
- LOG.trace("Configured protocols from transport options: {}", configuredProtocols);
- enabledProtocols.addAll(configuredProtocols);
- } else {
- List<String> engineProtocols = Arrays.asList(engine.getEnabledProtocols());
- LOG.trace("Default protocols from the SSLEngine: {}", engineProtocols);
- enabledProtocols.addAll(engineProtocols);
- }
-
- String[] disabledProtocols = options.getDisabledProtocols();
- if (disabledProtocols != null) {
- List<String> disabled = Arrays.asList(disabledProtocols);
- LOG.trace("Disabled protocols: {}", disabled);
- enabledProtocols.removeAll(disabled);
- }
-
- LOG.trace("Enabled protocols: {}", enabledProtocols);
-
- return enabledProtocols.toArray(new String[0]);
- }
-
- private static String[] buildEnabledCipherSuites(SSLEngine engine, NettyTransportSslOptions options) {
- List<String> enabledCipherSuites = new ArrayList<>();
-
- if (options.getEnabledCipherSuites() != null) {
- List<String> configuredCipherSuites = Arrays.asList(options.getEnabledCipherSuites());
- LOG.trace("Configured cipher suites from transport options: {}", configuredCipherSuites);
- enabledCipherSuites.addAll(configuredCipherSuites);
- } else {
- List<String> engineCipherSuites = Arrays.asList(engine.getEnabledCipherSuites());
- LOG.trace("Default cipher suites from the SSLEngine: {}", engineCipherSuites);
- enabledCipherSuites.addAll(engineCipherSuites);
- }
-
- String[] disabledCipherSuites = options.getDisabledCipherSuites();
- if (disabledCipherSuites != null) {
- List<String> disabled = Arrays.asList(disabledCipherSuites);
- LOG.trace("Disabled cipher suites: {}", disabled);
- enabledCipherSuites.removeAll(disabled);
- }
-
- LOG.trace("Enabled cipher suites: {}", enabledCipherSuites);
-
- return enabledCipherSuites.toArray(new String[0]);
- }
-
- private static TrustManager[] loadTrustManagers(NettyTransportSslOptions options) throws Exception {
- if (options.isTrustAll()) {
- return new TrustManager[] {createTrustAllTrustManager()};
- }
-
- if (options.getTrustStoreLocation() == null) {
- return null;
- }
-
- TrustManagerFactory fact = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
-
- String storeLocation = options.getTrustStoreLocation();
- String storePassword = options.getTrustStorePassword();
- String storeType = options.getStoreType();
-
- LOG.trace("Attempt to load TrustStore from location {} of type {}", storeLocation, storeType);
-
- KeyStore trustStore = loadStore(storeLocation, storePassword, storeType);
- fact.init(trustStore);
-
- return fact.getTrustManagers();
- }
-
- private static KeyManager[] loadKeyManagers(NettyTransportSslOptions options) throws Exception {
- if (options.getKeyStoreLocation() == null) {
- return null;
- }
-
- KeyManagerFactory fact = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
-
- String storeLocation = options.getKeyStoreLocation();
- String storePassword = options.getKeyStorePassword();
- String storeType = options.getStoreType();
- String alias = options.getKeyAlias();
-
- LOG.trace("Attempt to load KeyStore from location {} of type {}", storeLocation, storeType);
-
- KeyStore keyStore = loadStore(storeLocation, storePassword, storeType);
- fact.init(keyStore, storePassword != null ? storePassword.toCharArray() : null);
-
- if (alias == null) {
- return fact.getKeyManagers();
- } else {
- validateAlias(keyStore, alias);
- return wrapKeyManagers(alias, fact.getKeyManagers());
- }
- }
-
- private static KeyManager[] wrapKeyManagers(String alias, KeyManager[] origKeyManagers) {
- KeyManager[] keyManagers = new KeyManager[origKeyManagers.length];
- for (int i = 0; i < origKeyManagers.length; i++) {
- KeyManager km = origKeyManagers[i];
- if (km instanceof X509ExtendedKeyManager) {
- km = new X509AliasKeyManager(alias, (X509ExtendedKeyManager) km);
- }
-
- keyManagers[i] = km;
- }
-
- return keyManagers;
- }
-
- private static void validateAlias(KeyStore store, String alias) throws IllegalArgumentException, KeyStoreException {
- if (!store.containsAlias(alias)) {
- throw new IllegalArgumentException("The alias '" + alias + "' doesn't exist in the key store");
- }
-
- if (!store.isKeyEntry(alias)) {
- throw new IllegalArgumentException("The alias '" + alias + "' in the keystore doesn't represent a key entry");
- }
- }
-
- private static KeyStore loadStore(String storePath, final String password, String storeType) throws Exception {
- KeyStore store = KeyStore.getInstance(storeType);
- try (InputStream in = new FileInputStream(new File(storePath));) {
- store.load(in, password != null ? password.toCharArray() : null);
- }
-
- return store;
- }
-
- private static TrustManager createTrustAllTrustManager() {
- return new X509TrustManager() {
- @Override
- public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
- }
-
- @Override
- public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
- }
-
- @Override
- public X509Certificate[] getAcceptedIssuers() {
- return new X509Certificate[0];
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
deleted file mode 100644
index 9b0e6e2..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import java.io.IOException;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http.DefaultHttpHeaders;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.ContinuationWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-
-/**
- * Transport for communicating over WebSockets
- */
-public class NettyWSTransport extends NettyTcpTransport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
-
- private static final String AMQP_SUB_PROTOCOL = "amqp";
-
- /**
- * Create a new transport instance
- *
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
- this(null, remoteLocation, options);
- }
-
- /**
- * Create a new transport instance
- *
- * @param listener
- * the TransportListener that will receive events from this Transport.
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
- super(listener, remoteLocation, options);
- }
-
- @Override
- public void send(ByteBuf output) throws IOException {
- checkConnected();
- int length = output.readableBytes();
- if (length == 0) {
- return;
- }
-
- LOG.trace("Attempted write of: {} bytes", length);
-
- channel.writeAndFlush(new BinaryWebSocketFrame(output));
- }
-
- @Override
- protected ChannelInboundHandlerAdapter createChannelHandler() {
- return new NettyWebSocketTransportHandler();
- }
-
- @Override
- protected void addAdditionalHandlers(ChannelPipeline pipeline) {
- pipeline.addLast(new HttpClientCodec());
- pipeline.addLast(new HttpObjectAggregator(8192));
- }
-
- @Override
- protected void handleConnected(Channel channel) throws Exception {
- LOG.trace("Channel has become active, awaiting WebSocket handshake! Channel is {}", channel);
- }
-
- // ----- Handle connection events -----------------------------------------//
-
- private class NettyWebSocketTransportHandler extends NettyDefaultHandler<Object> {
-
- private final WebSocketClientHandshaker handshaker;
-
- NettyWebSocketTransportHandler() {
- handshaker = WebSocketClientHandshakerFactory.newHandshaker(
- getRemoteLocation(), WebSocketVersion.V13, AMQP_SUB_PROTOCOL,
- true, new DefaultHttpHeaders(), getMaxFrameSize());
- }
-
- @Override
- public void channelActive(ChannelHandlerContext context) throws Exception {
- handshaker.handshake(context.channel());
-
- super.channelActive(context);
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
- LOG.trace("New data read: incoming: {}", message);
-
- Channel ch = ctx.channel();
- if (!handshaker.isHandshakeComplete()) {
- handshaker.finishHandshake(ch, (FullHttpResponse) message);
- LOG.trace("WebSocket Client connected! {}", ctx.channel());
- // Now trigger super processing as we are really connected.
- NettyWSTransport.super.handleConnected(ch);
- return;
- }
-
- // We shouldn't get this since we handle the handshake previously.
- if (message instanceof FullHttpResponse) {
- FullHttpResponse response = (FullHttpResponse) message;
- throw new IllegalStateException(
- "Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
- }
-
- WebSocketFrame frame = (WebSocketFrame) message;
- if (frame instanceof TextWebSocketFrame) {
- TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
- LOG.warn("WebSocket Client received message: " + textFrame.text());
- ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
- } else if (frame instanceof BinaryWebSocketFrame) {
- BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
- LOG.trace("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
- listener.onData(binaryFrame.content());
- } else if (frame instanceof ContinuationWebSocketFrame) {
- ContinuationWebSocketFrame continuationFrame = (ContinuationWebSocketFrame) frame;
- LOG.trace("WebSocket Client received data continuation: {} bytes", continuationFrame.content().readableBytes());
- listener.onData(continuationFrame.content());
- } else if (frame instanceof PingWebSocketFrame) {
- LOG.trace("WebSocket Client received ping, response with pong");
- ch.write(new PongWebSocketFrame(frame.content()));
- } else if (frame instanceof CloseWebSocketFrame) {
- LOG.trace("WebSocket Client received closing");
- ch.close();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
deleted file mode 100644
index 42d6a0b..0000000
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/transport/X509AliasKeyManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.activemq.transport.amqp.client.transport;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.X509ExtendedKeyManager;
-import java.net.Socket;
-import java.security.Principal;
-import java.security.PrivateKey;
-import java.security.cert.X509Certificate;
-
-/**
- * An X509ExtendedKeyManager wrapper which always chooses and only
- * returns the given alias, and defers retrieval to the delegate
- * key manager.
- */
-public class X509AliasKeyManager extends X509ExtendedKeyManager {
-
- private X509ExtendedKeyManager delegate;
- private String alias;
-
- public X509AliasKeyManager(String alias, X509ExtendedKeyManager delegate) throws IllegalArgumentException {
- if (alias == null) {
- throw new IllegalArgumentException("The given key alias must not be null.");
- }
-
- this.alias = alias;
- this.delegate = delegate;
- }
-
- @Override
- public String chooseClientAlias(String[] keyType, Principal[] issuers, Socket socket) {
- return alias;
- }
-
- @Override
- public String chooseServerAlias(String keyType, Principal[] issuers, Socket socket) {
- return alias;
- }
-
- @Override
- public X509Certificate[] getCertificateChain(String alias) {
- return delegate.getCertificateChain(alias);
- }
-
- @Override
- public String[] getClientAliases(String keyType, Principal[] issuers) {
- return new String[]{alias};
- }
-
- @Override
- public PrivateKey getPrivateKey(String alias) {
- return delegate.getPrivateKey(alias);
- }
-
- @Override
- public String[] getServerAliases(String keyType, Principal[] issuers) {
- return new String[]{alias};
- }
-
- @Override
- public String chooseEngineClientAlias(String[] keyType, Principal[] issuers, SSLEngine engine) {
- return alias;
- }
-
- @Override
- public String chooseEngineServerAlias(String keyType, Principal[] issuers, SSLEngine engine) {
- return alias;
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5211afdf/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
new file mode 100644
index 0000000..9eab670
--- /dev/null
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/netty/NettyTcpTransport.java
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.netty;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.logging.LoggingHandler;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+ private static final int SHUTDOWN_TIMEOUT = 100;
+ public static final int DEFAULT_MAX_FRAME_SIZE = 65535;
+
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected NettyTransportListener listener;
+ protected final NettyTransportOptions options;
+ protected final URI remote;
+ protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final CountDownLatch connectLatch = new CountDownLatch(1);
+ private volatile IOException failureCause;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+ if (options == null) {
+ throw new IllegalArgumentException("Transport Options cannot be null");
+ }
+
+ if (remoteLocation == null) {
+ throw new IllegalArgumentException("Transport remote location cannot be null");
+ }
+
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ final SslHandler sslHandler;
+ if (isSSL()) {
+ try {
+ sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+ } catch (Exception ex) {
+ // TODO: can we stop it throwing Exception?
+ throw IOExceptionSupport.create(ex);
+ }
+ } else {
+ sslHandler = null;
+ }
+
+ group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ configureChannel(connectedChannel, sslHandler);
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ handleException(future.channel(), IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ try {
+ connectLatch.await();
+ } catch (InterruptedException ex) {
+ LOG.debug("Transport connection was interrupted.");
+ Thread.interrupted();
+ failureCause = IOExceptionSupport.create(ex);
+ }
+
+ if (failureCause != null) {
+ // Close out any Netty resources now as they are no longer needed.
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ channel = null;
+ }
+ if (group != null) {
+ Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in allotted time");
+ }
+ group = null;
+ }
+
+ throw failureCause;
+ } else {
+ // Connected, allow any held async error to fire now and close the transport.
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ if (failureCause != null) {
+ channel.pipeline().fireExceptionCaught(failureCause);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public boolean isSSL() {
+ return options.isSSL();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ connected.set(false);
+ try {
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ }
+ } finally {
+ if (group != null) {
+ Future<?> fut = group.shutdownGracefully(0, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ if (!fut.awaitUninterruptibly(2 * SHUTDOWN_TIMEOUT)) {
+ LOG.trace("Channel group shutdown failed to complete in allotted time");
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf allocateSendBuffer(int size) throws IOException {
+ checkConnected();
+ return channel.alloc().ioBuffer(size, size);
+ }
+
+ @Override
+ public ChannelFuture send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return null;
+ }
+
+ LOG.trace("Attempted write of: {} bytes", length);
+
+ return channel.writeAndFlush(output);
+ }
+
+ @Override
+ public NettyTransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(NettyTransportListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public NettyTransportOptions getTransportOptions() {
+ return options;
+ }
+
+ @Override
+ public URI getRemoteLocation() {
+ return remote;
+ }
+
+ @Override
+ public Principal getLocalPrincipal() {
+ Principal result = null;
+
+ if (isSSL()) {
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+ result = sslHandler.engine().getSession().getLocalPrincipal();
+ }
+
+ return result;
+ }
+
+ @Override
+ public void setMaxFrameSize(int maxFrameSize) {
+ if (connected.get()) {
+ throw new IllegalStateException("Cannot change Max Frame Size while connected.");
+ }
+
+ this.maxFrameSize = maxFrameSize;
+ }
+
+ @Override
+ public int getMaxFrameSize() {
+ return maxFrameSize;
+ }
+
+ // ----- Internal implementation details, can be overridden as needed -----//
+
+ protected String getRemoteHost() {
+ return remote.getHost();
+ }
+
+ protected int getRemotePort() {
+ if (remote.getPort() != -1) {
+ return remote.getPort();
+ } else {
+ return isSSL() ? getSslOptions().getDefaultSslPort() : getTransportOptions().getDefaultTcpPort();
+ }
+ }
+
+ protected void addAdditionalHandlers(ChannelPipeline pipeline) {
+
+ }
+
+ protected ChannelInboundHandlerAdapter createChannelHandler() {
+ return new NettyTcpTransportHandler();
+ }
+
+ // ----- Event Handlers which can be overridden in subclasses -------------//
+
+ protected void handleConnected(Channel channel) throws Exception {
+ LOG.trace("Channel has become active! Channel is {}", channel);
+ connectionEstablished(channel);
+ }
+
+ protected void handleChannelInactive(Channel channel) throws Exception {
+ LOG.trace("Channel has gone inactive! Channel is {}", channel);
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportClosed listener");
+ listener.onTransportClosed();
+ }
+ }
+
+ protected void handleException(Channel channel, Throwable cause) throws Exception {
+ LOG.trace("Exception on channel! Channel is {}", channel);
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportError listener");
+ if (failureCause != null) {
+ listener.onTransportError(failureCause);
+ } else {
+ listener.onTransportError(cause);
+ }
+ } else {
+ // Hold the first failure for later dispatch if connect succeeds.
+ // This will then trigger disconnect using the first error reported.
+ if (failureCause == null) {
+ LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+ failureCause = IOExceptionSupport.create(cause);
+ }
+
+ connectionFailed(channel, failureCause);
+ }
+ }
+
+ // ----- State change handlers and checks ---------------------------------//
+
+ protected final void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ /*
+ * Called when the transport has successfully connected and is ready for use.
+ */
+ private void connectionEstablished(Channel connectedChannel) {
+ channel = connectedChannel;
+ connected.set(true);
+ connectLatch.countDown();
+ }
+
+ /*
+ * Called when the transport connection failed and an error should be returned.
+ */
+ private void connectionFailed(Channel failedChannel, IOException cause) {
+ failureCause = cause;
+ channel = failedChannel;
+ connected.set(false);
+ connectLatch.countDown();
+ }
+
+ private NettyTransportSslOptions getSslOptions() {
+ return (NettyTransportSslOptions) getTransportOptions();
+ }
+
+ private void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ private void configureChannel(final Channel channel, final SslHandler sslHandler) throws Exception {
+ if (isSSL()) {
+ channel.pipeline().addLast(sslHandler);
+ }
+
+ if (getTransportOptions().isTraceBytes()) {
+ channel.pipeline().addLast("logger", new LoggingHandler(getClass()));
+ }
+
+ addAdditionalHandlers(channel.pipeline());
+
+ channel.pipeline().addLast(createChannelHandler());
+ }
+
+ // ----- Handle connection events -----------------------------------------//
+
+ protected abstract class NettyDefaultHandler<E> extends SimpleChannelInboundHandler<E> {
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext context) throws Exception {
+ channel = context.channel();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ // In the Secure case we need to let the handshake complete before we
+ // trigger the connected event.
+ if (!isSSL()) {
+ handleConnected(context.channel());
+ } else {
+ SslHandler sslHandler = context.pipeline().get(SslHandler.class);
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ LOG.trace("SSL Handshake has completed: {}", channel);
+ handleConnected(channel);
+ } else {
+ LOG.trace("SSL Handshake has failed: {}", channel);
+ handleException(channel, future.cause());
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ handleChannelInactive(context.channel());
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ handleException(context.channel(), cause);
+ }
+ }
+
+ // ----- Handle Binary data from connection -------------------------------//
+
+ protected class NettyTcpTransportHandler extends NettyDefaultHandler<ByteBuf> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+ LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+ listener.onData(buffer);
+ }
+ }
+}