You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/30 19:14:22 UTC
[2/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6339
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
new file mode 100644
index 0000000..1b604fe
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyWSTransport.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.DefaultHttpHeaders;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpClientCodec;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketVersion;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * Transport for communicating over WebSockets
+ */
+public class NettyWSTransport implements NettyTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyWSTransport.class);
+
+ private static final int QUIET_PERIOD = 20;
+ private static final int SHUTDOWN_TIMEOUT = 100;
+
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected NettyTransportListener listener;
+ protected NettyTransportOptions options;
+ protected final URI remote;
+ protected boolean secure;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private ChannelPromise handshakeFuture;
+ private IOException failureCause;
+ private Throwable pendingFailure;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyWSTransport(URI remoteLocation, NettyTransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyWSTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ this.secure = remoteLocation.getScheme().equalsIgnoreCase("wss");
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ configureChannel(connectedChannel);
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future;
+ try {
+ future = bootstrap.connect(getRemoteHost(), getRemotePort());
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ handleConnected(future.channel());
+ } else if (future.isCancelled()) {
+ connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+ } else {
+ connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ future.sync();
+
+ // Now wait for WS protocol level handshake completion
+ handshakeFuture.await();
+ } catch (InterruptedException ex) {
+ LOG.debug("Transport connection attempt was interrupted.");
+ Thread.interrupted();
+ failureCause = IOExceptionSupport.create(ex);
+ }
+
+ if (failureCause != null) {
+ // Close out any Netty resources now as they are no longer needed.
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ channel = null;
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ group = null;
+ }
+
+ throw failureCause;
+ } else {
+ // Connected, allow any held async error to fire now and close the transport.
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ if (pendingFailure != null) {
+ channel.pipeline().fireExceptionCaught(pendingFailure);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public boolean isSSL() {
+ return secure;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ connected.set(false);
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf allocateSendBuffer(int size) throws IOException {
+ checkConnected();
+ return channel.alloc().ioBuffer(size, size);
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return;
+ }
+
+ LOG.trace("Attempted write of: {} bytes", length);
+
+ channel.writeAndFlush(new BinaryWebSocketFrame(output));
+ }
+
+ @Override
+ public NettyTransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(NettyTransportListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public NettyTransportOptions getTransportOptions() {
+ if (options == null) {
+ if (isSSL()) {
+ options = NettyTransportSslOptions.INSTANCE;
+ } else {
+ options = NettyTransportOptions.INSTANCE;
+ }
+ }
+
+ return options;
+ }
+
+ @Override
+ public URI getRemoteLocation() {
+ return remote;
+ }
+
+ @Override
+ public Principal getLocalPrincipal() {
+ if (!isSSL()) {
+ throw new UnsupportedOperationException("Not connected to a secure channel");
+ }
+
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+ return sslHandler.engine().getSession().getLocalPrincipal();
+ }
+
+ //----- Internal implementation details, can be overridden as needed --//
+
+ protected String getRemoteHost() {
+ return remote.getHost();
+ }
+
+ protected int getRemotePort() {
+ int port = remote.getPort();
+
+ if (port <= 0) {
+ if (isSSL()) {
+ port = getSslOptions().getDefaultSslPort();
+ } else {
+ port = getTransportOptions().getDefaultTcpPort();
+ }
+ }
+
+ return port;
+ }
+
+ protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ protected void configureChannel(final Channel channel) throws Exception {
+ if (isSSL()) {
+ SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ LOG.trace("SSL Handshake has completed: {}", channel);
+ connectionEstablished(channel);
+ } else {
+ LOG.trace("SSL Handshake has failed: {}", channel);
+ connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ channel.pipeline().addLast(sslHandler);
+ }
+
+ channel.pipeline().addLast(new HttpClientCodec());
+ channel.pipeline().addLast(new HttpObjectAggregator(8192));
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+
+ protected void handleConnected(final Channel channel) throws Exception {
+ if (!isSSL()) {
+ connectionEstablished(channel);
+ }
+ }
+
+ //----- State change handlers and checks ---------------------------------//
+
+ /**
+ * Called when the transport has successfully connected and is ready for use.
+ */
+ protected void connectionEstablished(Channel connectedChannel) {
+ LOG.info("WebSocket connectionEstablished! {}", connectedChannel);
+ channel = connectedChannel;
+ connected.set(true);
+ }
+
+ /**
+ * Called when the transport connection failed and an error should be returned.
+ *
+ * @param failedChannel
+ * The Channel instance that failed.
+ * @param cause
+ * An IOException that describes the cause of the failed connection.
+ */
+ protected void connectionFailed(Channel failedChannel, IOException cause) {
+ failureCause = IOExceptionSupport.create(cause);
+ channel = failedChannel;
+ connected.set(false);
+ handshakeFuture.setFailure(cause);
+ }
+
+ private NettyTransportSslOptions getSslOptions() {
+ return (NettyTransportSslOptions) getTransportOptions();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<Object> {
+
+ private final WebSocketClientHandshaker handshaker;
+
+ public NettyTcpTransportHandler() {
+ handshaker = WebSocketClientHandshakerFactory.newHandshaker(
+ remote, WebSocketVersion.V13, "amqp", false, new DefaultHttpHeaders());
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext context) {
+ LOG.trace("Handler has become added! Channel is {}", context.channel());
+ handshakeFuture = context.newPromise();
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has become active! Channel is {}", context.channel());
+ handshaker.handshake(context.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportClosed listener");
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.trace("Exception on channel! Channel is {} -> {}", context.channel(), cause.getMessage());
+ LOG.trace("Error Stack: ", cause);
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportError listener");
+ if (pendingFailure != null) {
+ listener.onTransportError(pendingFailure);
+ } else {
+ listener.onTransportError(cause);
+ }
+ } else {
+ // Hold the first failure for later dispatch if connect succeeds.
+ // This will then trigger disconnect using the first error reported.
+ if (pendingFailure != null) {
+ LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+ pendingFailure = cause;
+ }
+
+ if (!handshakeFuture.isDone()) {
+ handshakeFuture.setFailure(cause);
+ }
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, Object message) throws Exception {
+ LOG.trace("New data read: incoming: {}", message);
+
+ Channel ch = ctx.channel();
+ if (!handshaker.isHandshakeComplete()) {
+ handshaker.finishHandshake(ch, (FullHttpResponse) message);
+ LOG.info("WebSocket Client connected! {}", ctx.channel());
+ handshakeFuture.setSuccess();
+ return;
+ }
+
+ // We shouldn't get this since we handle the handshake previously.
+ if (message instanceof FullHttpResponse) {
+ FullHttpResponse response = (FullHttpResponse) message;
+ throw new IllegalStateException(
+ "Unexpected FullHttpResponse (getStatus=" + response.getStatus() +
+ ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')');
+ }
+
+ WebSocketFrame frame = (WebSocketFrame) message;
+ if (frame instanceof TextWebSocketFrame) {
+ TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
+ LOG.warn("WebSocket Client received message: " + textFrame.text());
+ ctx.fireExceptionCaught(new IOException("Received invalid frame over WebSocket."));
+ } else if (frame instanceof BinaryWebSocketFrame) {
+ BinaryWebSocketFrame binaryFrame = (BinaryWebSocketFrame) frame;
+ LOG.info("WebSocket Client received data: {} bytes", binaryFrame.content().readableBytes());
+ listener.onData(binaryFrame.content());
+ } else if (frame instanceof PongWebSocketFrame) {
+ LOG.trace("WebSocket Client received pong");
+ } else if (frame instanceof CloseWebSocketFrame) {
+ LOG.trace("WebSocket Client received closing");
+ ch.close();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
index 75e30f9..e9d50b7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpBrokerReuqestedHearbeatsTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* Test handling of heartbeats requested by the broker.
*/
+@RunWith(Parameterized.class)
public class AmqpBrokerReuqestedHearbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
+ @Parameters(name="connector={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"amqp", false},
+ {"amqp+ws", false},
+ });
+ }
+
+ public AmqpBrokerReuqestedHearbeatsTest(String connectorScheme, boolean secure) {
+ super(connectorScheme, secure);
+ }
+
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=" + TEST_IDLE_TIMEOUT;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
index c7ab0cd..3c779a2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpClientRequestsHeartbeatsTest.java
@@ -21,6 +21,8 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,14 +34,30 @@ import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* Tests that cover broker behavior when the client requests heartbeats
*/
+@RunWith(Parameterized.class)
public class AmqpClientRequestsHeartbeatsTest extends AmqpClientTestSupport {
private final int TEST_IDLE_TIMEOUT = 3000;
+ @Parameters(name="connector={0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"amqp", false},
+ {"amqp+ws", false},
+ });
+ }
+
+ public AmqpClientRequestsHeartbeatsTest(String connectorScheme, boolean secure) {
+ super(connectorScheme, secure);
+ }
+
@Override
protected String getAdditionalConfig() {
return "&transport.wireFormat.idleTimeout=0";
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
index fa519ab..2d154e6 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpConnectionsTest.java
@@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Map;
import org.apache.activemq.transport.amqp.AmqpSupport;
@@ -37,16 +39,34 @@ import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
/**
* Test broker handling of AMQP connections with various configurations.
*/
+@RunWith(Parameterized.class)
public class AmqpConnectionsTest extends AmqpClientTestSupport {
private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"amqp", false},
+ {"amqp+ws", false},
+ {"amqp+ssl", true},
+ {"amqp+wss", true}
+ });
+ }
+
+ public AmqpConnectionsTest(String connectorScheme, boolean secure) {
+ super(connectorScheme, secure);
+ }
+
@Test(timeout = 60000)
public void testCanConnect() throws Exception {
AmqpClient client = createAmqpClient();
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/resources/log4j.properties b/activemq-amqp/src/test/resources/log4j.properties
index d25017d..f88b152 100755
--- a/activemq-amqp/src/test/resources/log4j.properties
+++ b/activemq-amqp/src/test/resources/log4j.properties
@@ -20,7 +20,7 @@
#
log4j.rootLogger=WARN, console, file
log4j.logger.org.apache.activemq=INFO
-log4j.logger.org.apache.activemq.transport.amqp=DEBUG
+log4j.logger.org.apache.activemq.transport.amqp=TRACE
log4j.logger.org.apache.activemq.transport.amqp.FRAMES=INFO
log4j.logger.org.fusesource=INFO
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 9e13cf9..5f75a3c 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.vm;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -34,6 +35,7 @@ import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -438,4 +440,19 @@ public class VMTransport implements Transport, Task {
public int getReceiveCounter() {
return receiveCounter;
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
index d24596a..2067c14 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/Transport.java
@@ -18,7 +18,10 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
+
import org.apache.activemq.Service;
+import org.apache.activemq.wireformat.WireFormat;
/**
* Represents the client side of a transport allowing messages to be sent
@@ -116,6 +119,7 @@ public interface Transport extends Service {
* @return true if updating uris is supported
*/
boolean isUpdateURIsSupported();
+
/**
* reconnect to another location
* @param uri
@@ -139,4 +143,25 @@ public interface Transport extends Service {
* @return a counter which gets incremented as data is read from the transport.
*/
int getReceiveCounter();
+
+ /**
+ * @return the Certificates provided by the peer, or null if not a secure channel.
+ */
+ X509Certificate[] getPeerCertificates();
+
+ /**
+ * Sets the certificates provided by the connected peer.
+ *
+ * @param certificates
+ * the Certificates provided by the peer.
+ */
+ void setPeerCertificates(X509Certificate[] certificates);
+
+ /**
+ * Retrieves the WireFormat instance associated with this Transport instance.
+ *
+ * @return the WireFormat in use.
+ */
+ WireFormat getWireFormat();
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
index b0fafe8..ce02a7a 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFilter.java
@@ -18,9 +18,12 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
+
+import org.apache.activemq.wireformat.WireFormat;
/**
- *
+ *
*/
public class TransportFilter implements TransportListener, Transport {
protected final Transport next;
@@ -30,10 +33,12 @@ public class TransportFilter implements TransportListener, Transport {
this.next = next;
}
+ @Override
public TransportListener getTransportListener() {
return transportListener;
}
+ @Override
public void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
@@ -48,6 +53,7 @@ public class TransportFilter implements TransportListener, Transport {
* @throws IOException
* if the next channel has not been set.
*/
+ @Override
public void start() throws Exception {
if (next == null) {
throw new IOException("The next channel has not been set.");
@@ -61,10 +67,12 @@ public class TransportFilter implements TransportListener, Transport {
/**
* @see org.apache.activemq.Service#stop()
*/
+ @Override
public void stop() throws Exception {
next.stop();
}
+ @Override
public void onCommand(Object command) {
transportListener.onCommand(command);
}
@@ -81,34 +89,42 @@ public class TransportFilter implements TransportListener, Transport {
return next.toString();
}
+ @Override
public void oneway(Object command) throws IOException {
next.oneway(command);
}
+ @Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return next.asyncRequest(command, null);
}
+ @Override
public Object request(Object command) throws IOException {
return next.request(command);
}
+ @Override
public Object request(Object command, int timeout) throws IOException {
return next.request(command, timeout);
}
+ @Override
public void onException(IOException error) {
transportListener.onException(error);
}
+ @Override
public void transportInterupted() {
transportListener.transportInterupted();
}
+ @Override
public void transportResumed() {
transportListener.transportResumed();
}
+ @Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
@@ -116,6 +132,7 @@ public class TransportFilter implements TransportListener, Transport {
return next.narrow(target);
}
+ @Override
public String getRemoteAddress() {
return next.getRemoteAddress();
}
@@ -124,35 +141,58 @@ public class TransportFilter implements TransportListener, Transport {
* @return
* @see org.apache.activemq.transport.Transport#isFaultTolerant()
*/
+ @Override
public boolean isFaultTolerant() {
return next.isFaultTolerant();
}
+ @Override
public boolean isDisposed() {
return next.isDisposed();
}
+ @Override
public boolean isConnected() {
return next.isConnected();
}
+ @Override
public void reconnect(URI uri) throws IOException {
next.reconnect(uri);
}
+ @Override
public int getReceiveCounter() {
return next.getReceiveCounter();
}
+ @Override
public boolean isReconnectSupported() {
return next.isReconnectSupported();
}
+ @Override
public boolean isUpdateURIsSupported() {
return next.isUpdateURIsSupported();
}
+ @Override
public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
next.updateURIs(rebalance,uris);
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return next.getPeerCertificates();
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ next.setPeerCertificates(certificates);
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return next.getWireFormat();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index f502179..a46b318 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -26,6 +26,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -61,6 +62,7 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1448,4 +1450,28 @@ public class FailoverTransport implements CompositeTransport {
public void setWarnAfterReconnectAttempts(int warnAfterReconnectAttempts) {
this.warnAfterReconnectAttempts = warnAfterReconnectAttempts;
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ Transport transport = connectedTransport.get();
+ if (transport != null) {
+ return transport.getPeerCertificates();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ Transport transport = connectedTransport.get();
+ if (transport != null) {
+ return transport.getWireFormat();
+ } else {
+ return null;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
index 00ae7ae..d7f4f85 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,6 +19,7 @@ package org.apache.activemq.transport.fanout;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
@@ -44,13 +45,12 @@ import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
+import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Transport that fans out a connection to multiple brokers.
- *
- *
*/
public class FanoutTransport implements CompositeTransport {
@@ -113,9 +113,9 @@ public class FanoutTransport implements CompositeTransport {
@Override
public void onCommand(Object o) {
- Command command = (Command)o;
+ Command command = (Command) o;
if (command.isResponse()) {
- Integer id = new Integer(((Response)command).getCorrelationId());
+ Integer id = new Integer(((Response) command).getCorrelationId());
RequestCounter rc = requestMap.get(id);
if (rc != null) {
if (rc.ackCount.decrementAndGet() <= 0) {
@@ -191,7 +191,7 @@ public class FanoutTransport implements CompositeTransport {
// Try to connect them up.
Iterator<FanoutTransportHandler> iter = transports.iterator();
- for (int i = 0; iter.hasNext() && !disposed; i++) {
+ while (iter.hasNext() && !disposed) {
long now = System.currentTimeMillis();
@@ -228,9 +228,9 @@ public class FanoutTransport implements CompositeTransport {
} catch (Exception e) {
LOG.debug("Connect fail to: " + uri + ", reason: " + e);
- if( fanoutHandler.transport !=null ) {
+ if (fanoutHandler.transport != null) {
ServiceSupport.dispose(fanoutHandler.transport);
- fanoutHandler.transport=null;
+ fanoutHandler.transport = null;
}
if (maxReconnectAttempts > 0 && ++fanoutHandler.connectFailures >= maxReconnectAttempts) {
@@ -256,14 +256,13 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
+
if (transports.size() == connectedCount || disposed) {
reconnectMutex.notifyAll();
return false;
}
-
}
}
-
}
try {
@@ -292,7 +291,7 @@ public class FanoutTransport implements CompositeTransport {
restoreTransport(th);
}
}
- connected=true;
+ connected = true;
}
}
@@ -307,7 +306,7 @@ public class FanoutTransport implements CompositeTransport {
}
started = false;
disposed = true;
- connected=false;
+ connected = false;
for (Iterator<FanoutTransportHandler> iter = transports.iterator(); iter.hasNext();) {
FanoutTransportHandler th = iter.next();
@@ -367,7 +366,7 @@ public class FanoutTransport implements CompositeTransport {
@Override
public void oneway(Object o) throws IOException {
- final Command command = (Command)o;
+ final Command command = (Command) o;
try {
synchronized (reconnectMutex) {
@@ -392,7 +391,7 @@ public class FanoutTransport implements CompositeTransport {
}
if (error instanceof IOException) {
- throw (IOException)error;
+ throw (IOException) error;
}
throw IOExceptionSupport.create(error);
}
@@ -428,7 +427,6 @@ public class FanoutTransport implements CompositeTransport {
primary.onException(e);
}
}
-
}
} catch (InterruptedException e) {
// Some one may be trying to stop our thread.
@@ -443,13 +441,12 @@ public class FanoutTransport implements CompositeTransport {
*/
private boolean isFanoutCommand(Command command) {
if (command.isMessage()) {
- if( fanOutQueues ) {
+ if (fanOutQueues) {
return true;
}
- return ((Message)command).getDestination().isTopic();
+ return ((Message) command).getDestination().isTopic();
}
- if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE ||
- command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
+ if (command.getDataStructureType() == ConsumerInfo.DATA_STRUCTURE_TYPE || command.getDataStructureType() == RemoveInfo.DATA_STRUCTURE_TYPE) {
return false;
}
return true;
@@ -491,7 +488,6 @@ public class FanoutTransport implements CompositeTransport {
@Override
public <T> T narrow(Class<T> target) {
-
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
}
@@ -509,7 +505,6 @@ public class FanoutTransport implements CompositeTransport {
}
return null;
-
}
protected void restoreTransport(FanoutTransportHandler th) throws Exception, IOException {
@@ -523,8 +518,7 @@ public class FanoutTransport implements CompositeTransport {
}
@Override
- public void add(boolean reblance,URI uris[]) {
-
+ public void add(boolean reblance, URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
@@ -537,6 +531,7 @@ public class FanoutTransport implements CompositeTransport {
break;
}
}
+
if (!match) {
FanoutTransportHandler th = new FanoutTransportHandler(uri);
transports.add(th);
@@ -544,12 +539,10 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
-
}
@Override
- public void remove(boolean rebalance,URI uris[]) {
-
+ public void remove(boolean rebalance, URI uris[]) {
synchronized (reconnectMutex) {
for (int i = 0; i < uris.length; i++) {
URI uri = uris[i];
@@ -567,13 +560,11 @@ public class FanoutTransport implements CompositeTransport {
}
}
}
-
}
@Override
public void reconnect(URI uri) throws IOException {
- add(true,new URI[]{uri});
-
+ add(true, new URI[] { uri });
}
@Override
@@ -585,12 +576,12 @@ public class FanoutTransport implements CompositeTransport {
public boolean isUpdateURIsSupported() {
return true;
}
+
@Override
- public void updateURIs(boolean reblance,URI[] uris) throws IOException {
- add(reblance,uris);
+ public void updateURIs(boolean reblance, URI[] uris) throws IOException {
+ add(reblance, uris);
}
-
@Override
public String getRemoteAddress() {
if (primary != null) {
@@ -625,7 +616,6 @@ public class FanoutTransport implements CompositeTransport {
return disposed;
}
-
@Override
public boolean isConnected() {
return connected;
@@ -643,4 +633,19 @@ public class FanoutTransport implements CompositeTransport {
}
return rc;
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
index 60c94af..8b00e27 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,16 +18,16 @@ package org.apache.activemq.transport.mock;
import java.io.IOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
+
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFilter;
import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.wireformat.WireFormat;
-/**
- *
- */
public class MockTransport extends DefaultTransportListener implements Transport {
protected Transport next;
@@ -37,8 +37,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
this.next = next;
}
- /**
- */
+ @Override
public synchronized void setTransportListener(TransportListener channelListener) {
this.transportListener = channelListener;
if (channelListener == null) {
@@ -50,8 +49,10 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.Service#start()
- * @throws IOException if the next channel has not been set.
+ * @throws IOException
+ * if the next channel has not been set.
*/
+ @Override
public void start() throws Exception {
if (getNext() == null) {
throw new IOException("The next channel has not been set.");
@@ -65,6 +66,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.Service#stop()
*/
+ @Override
public void stop() throws Exception {
getNext().stop();
}
@@ -84,6 +86,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @return Returns the packetListener.
*/
+ @Override
public synchronized TransportListener getTransportListener() {
return transportListener;
}
@@ -93,18 +96,22 @@ public class MockTransport extends DefaultTransportListener implements Transport
return getNext().toString();
}
+ @Override
public void oneway(Object command) throws IOException {
getNext().oneway(command);
}
+ @Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
return getNext().asyncRequest(command, null);
}
+ @Override
public Object request(Object command) throws IOException {
return getNext().request(command);
}
+ @Override
public Object request(Object command, int timeout) throws IOException {
return getNext().request(command, timeout);
}
@@ -114,6 +121,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
getTransportListener().onException(error);
}
+ @Override
public <T> T narrow(Class<T> target) {
if (target.isAssignableFrom(getClass())) {
return target.cast(this);
@@ -131,6 +139,7 @@ public class MockTransport extends DefaultTransportListener implements Transport
setNext(filter);
}
+ @Override
public String getRemoteAddress() {
return getNext().getRemoteAddress();
}
@@ -138,35 +147,58 @@ public class MockTransport extends DefaultTransportListener implements Transport
/**
* @see org.apache.activemq.transport.Transport#isFaultTolerant()
*/
+ @Override
public boolean isFaultTolerant() {
return getNext().isFaultTolerant();
}
- public boolean isDisposed() {
- return getNext().isDisposed();
- }
-
- public boolean isConnected() {
- return getNext().isConnected();
+ @Override
+ public boolean isDisposed() {
+ return getNext().isDisposed();
}
- public void reconnect(URI uri) throws IOException {
- getNext().reconnect(uri);
- }
+ @Override
+ public boolean isConnected() {
+ return getNext().isConnected();
+ }
+ @Override
+ public void reconnect(URI uri) throws IOException {
+ getNext().reconnect(uri);
+ }
+
+ @Override
public int getReceiveCounter() {
return getNext().getReceiveCounter();
}
-
+ @Override
public boolean isReconnectSupported() {
return getNext().isReconnectSupported();
}
+ @Override
public boolean isUpdateURIsSupported() {
return getNext().isUpdateURIsSupported();
}
- public void updateURIs(boolean reblance,URI[] uris) throws IOException {
- getNext().updateURIs(reblance,uris);
+
+ @Override
+ public void updateURIs(boolean reblance, URI[] uris) throws IOException {
+ getNext().updateURIs(reblance, uris);
+ }
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return getNext().getPeerCertificates();
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ getNext().setPeerCertificates(certificates);
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return getNext().getWireFormat();
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
index 2b3953f..0c2fab9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -14,7 +14,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.activemq.transport.tcp;
import java.io.IOException;
@@ -43,6 +42,7 @@ import org.apache.activemq.wireformat.WireFormat;
* unexpected situations may occur.
*/
public class SslTransport extends TcpTransport {
+
/**
* Connect to a remote node such as a Broker.
*
@@ -56,6 +56,7 @@ public class SslTransport extends TcpTransport {
* @throws UnknownHostException If TcpTransport throws.
* @throws IOException If TcpTransport throws.
*/
+ @SuppressWarnings({ "unchecked", "rawtypes" })
public SslTransport(WireFormat wireFormat, SSLSocketFactory socketFactory, URI remoteLocation, URI localLocation, boolean needClientAuth) throws IOException {
super(wireFormat, socketFactory, remoteLocation, localLocation);
if (this.socket != null) {
@@ -65,7 +66,7 @@ public class SslTransport extends TcpTransport {
// a single proxy to route to different messaging apps.
// On java 1.7 it seems like it can only be configured via reflection.
- // todo: find out if this will work on java 1.8
+ // TODO: find out if this will work on java 1.8
HashMap props = new HashMap();
props.put("host", remoteLocation.getHost());
IntrospectionSupport.setProperties(this.socket, props);
@@ -110,6 +111,7 @@ public class SslTransport extends TcpTransport {
/**
* @return peer certificate chain associated with the ssl socket
*/
+ @Override
public X509Certificate[] getPeerCertificates() {
SSLSocket sslSocket = (SSLSocket)this.socket;
@@ -120,7 +122,7 @@ public class SslTransport extends TcpTransport {
try {
clientCertChain = (X509Certificate[])sslSession.getPeerCertificates();
} catch (SSLPeerUnverifiedException e) {
- clientCertChain = null;
+ clientCertChain = null;
}
return clientCertChain;
@@ -133,5 +135,4 @@ public class SslTransport extends TcpTransport {
public String toString() {
return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 60fe283..04d1636 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -1,5 +1,5 @@
-/**
-gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -51,12 +52,11 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
- *
- * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- *
*/
public class TcpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
+
private static final Logger LOG = LoggerFactory.getLogger(TcpTransport.class);
+
protected final URI remoteLocation;
protected final URI localLocation;
protected final WireFormat wireFormat;
@@ -754,4 +754,13 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
public WireFormat getWireFormat() {
return wireFormat;
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
index daa4860..d1ac088 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -20,7 +20,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.BindException;
import java.net.DatagramSocket;
-import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketException;
@@ -28,6 +27,7 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.DatagramChannel;
+import java.security.cert.X509Certificate;
import org.apache.activemq.Service;
import org.apache.activemq.command.Command;
@@ -47,10 +47,9 @@ import org.slf4j.LoggerFactory;
/**
* An implementation of the {@link Transport} interface using raw UDP
- *
- *
*/
public class UdpTransport extends TransportThreadSupport implements Transport, Service, Runnable {
+
private static final Logger LOG = LoggerFactory.getLogger(UdpTransport.class);
private static final int MAX_BIND_ATTEMPTS = 50;
@@ -112,6 +111,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* A one way asynchronous send
*/
+ @Override
public void oneway(Object command) throws IOException {
oneway(command, targetAddress);
}
@@ -130,6 +130,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* @return pretty print of 'this'
*/
+ @Override
public String toString() {
if (description != null) {
return description + port;
@@ -141,6 +142,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
/**
* reads packets from a Socket
*/
+ @Override
public void run() {
LOG.trace("Consumer thread starting for: " + toString());
while (!isStopped()) {
@@ -350,6 +352,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return host;
}
+ @Override
protected void doStart() throws Exception {
getCommandChannel().start();
@@ -387,7 +390,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
// down
// a previously bound socket, it can take a little while before we can
// bind it again.
- //
+ //
for (int i = 0; i < MAX_BIND_ATTEMPTS; i++) {
try {
socket.bind(localAddress);
@@ -419,6 +422,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return new InetSocketAddress(port);
}
+ @Override
protected void doStop(ServiceStopper stopper) throws Exception {
if (channel != null) {
channel.close();
@@ -457,6 +461,7 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
}
}
+ @Override
public String getRemoteAddress() {
if (targetAddress != null) {
return "" + targetAddress;
@@ -464,10 +469,20 @@ public class UdpTransport extends TransportThreadSupport implements Transport, S
return null;
}
+ @Override
public int getReceiveCounter() {
if (commandChannel == null) {
return 0;
}
return commandChannel.getReceiveCounter();
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
new file mode 100644
index 0000000..e15f86f
--- /dev/null
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/ws/WSTransport.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Interface for a WebSocket Transport which provide hooks that a servlet can
+ * use to pass along WebSocket data and events.
+ */
+public interface WSTransport extends Transport {
+
+ /**
+ * WS Transport output sink, used to give the WS Transport implementation
+ * a way to produce output back to the WS connection without coupling it
+ * to the implementation.
+ */
+ public interface WSTransportSink {
+
+ /**
+ * Called from the Transport when new outgoing String data is ready.
+ *
+ * @param data
+ * The newly prepared outgoing string data.
+ *
+ * @throws IOException if an error occurs or the socket doesn't support text data.
+ */
+ void onSocketOutboundText(String data) throws IOException;
+
+ /**
+ * Called from the Transport when new outgoing String data is ready.
+ *
+ * @param data
+ * The newly prepared outgoing string data.
+ *
+ * @throws IOException if an error occurs or the socket doesn't support text data.
+ */
+ void onSocketOutboundBinary(ByteBuffer data) throws IOException;
+ }
+
+ /**
+ * @return the WS sub-protocol that this transport is supplying.
+ */
+ String getSubProtocol();
+
+ /**
+ * Called to provide the WS with the output data sink.
+ */
+ void setTransportSink(WSTransportSink outputSink);
+
+ /**
+ * Called from the WebSocket framework when new incoming String data is received.
+ *
+ * @param data
+ * The newly received incoming data.
+ *
+ * @throws IOException if an error occurs or the socket doesn't support text data.
+ */
+ void onWebSocketText(String data) throws IOException;
+
+ /**
+ * Called from the WebSocket framework when new incoming Binary data is received.
+ *
+ * @param data
+ * The newly received incoming data.
+ *
+ * @throws IOException if an error occurs or the socket doesn't support binary data.
+ */
+ void onWebSocketBinary(ByteBuffer data) throws IOException;
+
+ /**
+ * Called from the WebSocket framework when the socket has been closed unexpectedly.
+ *
+ * @throws IOException if an error while processing the close.
+ */
+ void onWebSocketClosed() throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index 5b288c5..e15e076 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -228,7 +228,7 @@
</plugins>
</build>
</profile>
- <profile>
+ <profile>
<id>activemq.tests-autoTransport</id>
<activation>
<property>
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
index 4132c7c..fdf85b3 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/BlockingQueueTransport.java
@@ -17,21 +17,22 @@
package org.apache.activemq.transport.http;
import java.io.IOException;
+import java.security.cert.X509Certificate;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.transport.TransportSupport;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
/**
* A server side HTTP based TransportChannel which processes incoming packets
* and adds outgoing packets onto a {@link Queue} so that they can be dispatched
* by the HTTP GET requests from the client.
- *
- *
*/
-public class BlockingQueueTransport extends TransportSupport {
+public class BlockingQueueTransport extends TransportSupport {
+
public static final long MAX_TIMEOUT = 30000L;
private BlockingQueue<Object> queue;
@@ -44,6 +45,7 @@ public class BlockingQueueTransport extends TransportSupport {
return queue;
}
+ @Override
public void oneway(Object command) throws IOException {
try {
boolean success = queue.offer(command, MAX_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -55,18 +57,35 @@ public class BlockingQueueTransport extends TransportSupport {
}
}
-
+ @Override
public String getRemoteAddress() {
return "blockingQueue_" + queue.hashCode();
}
+ @Override
protected void doStart() throws Exception {
}
+ @Override
protected void doStop(ServiceStopper stopper) throws Exception {
}
+ @Override
public int getReceiveCounter() {
return 0;
}
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return null;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
index c65dbb9..7f446c5 100755
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpClientTransport.java
@@ -20,6 +20,7 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
+import java.security.cert.X509Certificate;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -30,6 +31,7 @@ import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
@@ -396,4 +398,17 @@ public class HttpClientTransport extends HttpTransportSupport {
this.minSendAsCompressedSize = minSendAsCompressedSize;
}
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return null;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return getTextWireFormat();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
index 4daaf65..a8309b6 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/util/HttpTransportUtils.java
@@ -33,4 +33,25 @@ public class HttpTransportUtils {
remoteAddress.append(request.getRemotePort());
return remoteAddress.toString();
}
+
+ public static String generateWsRemoteAddress(HttpServletRequest request, String schemePrefix) {
+ if (request == null) {
+ throw new IllegalArgumentException("HttpServletRequest must not be null.");
+ }
+
+ StringBuilder remoteAddress = new StringBuilder();
+ String scheme = request.getScheme();
+ if (scheme != null && scheme.equalsIgnoreCase("https")) {
+ scheme = schemePrefix + "+wss://";
+ } else {
+ scheme = schemePrefix + "+ws://";
+ }
+
+ remoteAddress.append(scheme);
+ remoteAddress.append(request.getRemoteAddr());
+ remoteAddress.append(":");
+ remoteAddress.append(request.getRemotePort());
+
+ return remoteAddress.toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
index b8e0f8f..dd25a1d 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -147,7 +147,7 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
stompInactivityMonitor.onCommand(new KeepAliveInfo());
} else {
StompFrame frame = (StompFrame)wireFormat.unmarshal(new ByteSequence(data.getBytes("UTF-8")));
- frame.setTransportContext(getCertificates());
+ frame.setTransportContext(getPeerCertificates());
protocolConverter.onStompCommand(frame);
}
}
@@ -162,11 +162,13 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
return socketTransportStarted.getCount() == 0;
}
- public X509Certificate[] getCertificates() {
+ @Override
+ public X509Certificate[] getPeerCertificates() {
return certificates;
}
- public void setCertificates(X509Certificate[] certificates) {
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
this.certificates = certificates;
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
index 3ded98f..340505a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -129,10 +129,6 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
}
}
-
- /* (non-Javadoc)
- * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketClose(int, java.lang.String)
- */
@Override
public void onWebSocketClose(int statusCode, String reason) {
LOG.trace("STOMP WS Connection closed, code:{} message:{}", statusCode, reason);
@@ -140,15 +136,10 @@ public class StompWSConnection extends WebSocketAdapter implements WebSocketList
this.connection = null;
this.closeCode = statusCode;
this.closeMessage = reason;
-
}
- /* (non-Javadoc)
- * @see org.eclipse.jetty.websocket.api.WebSocketListener#onWebSocketConnect(org.eclipse.jetty.websocket.api.Session)
- */
@Override
- public void onWebSocketConnect(
- org.eclipse.jetty.websocket.api.Session session) {
+ public void onWebSocketConnect(org.eclipse.jetty.websocket.api.Session session) {
this.connection = session;
this.connectLatch.countDown();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
index bfcb5df..744685b 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportFactory.java
@@ -23,6 +23,8 @@ import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.util.IOExceptionSupport;
@@ -32,7 +34,9 @@ import org.apache.activemq.util.URISupport;
/**
* Factory for WebSocket (ws) transport
*/
-public class WSTransportFactory extends TransportFactory {
+public class WSTransportFactory extends TransportFactory implements BrokerServiceAware {
+
+ private BrokerService brokerService;
@Override
public TransportServer doBind(URI location) throws IOException {
@@ -42,6 +46,7 @@ public class WSTransportFactory extends TransportFactory {
Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http.");
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "");
IntrospectionSupport.setProperties(result, transportOptions);
+ result.setBrokerService(brokerService);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
return result;
@@ -49,4 +54,9 @@ public class WSTransportFactory extends TransportFactory {
throw IOExceptionSupport.create(e);
}
}
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
new file mode 100644
index 0000000..7d3ba18
--- /dev/null
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/WSTransportProxy.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.ws.WSTransport.WSTransportSink;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.eclipse.jetty.websocket.api.Session;
+import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A proxy class that manages sending WebSocket events to the wrapped protocol level
+ * WebSocket Transport.
+ */
+public final class WSTransportProxy extends TransportSupport implements Transport, WebSocketListener, BrokerServiceAware, WSTransportSink {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WSTransportProxy.class);
+
+ private final int ORDERLY_CLOSE_TIMEOUT = 10;
+
+ private final ReentrantLock protocolLock = new ReentrantLock();
+ private final CountDownLatch socketTransportStarted = new CountDownLatch(1);
+ private final String remoteAddress;
+
+ private final Transport transport;
+ private final WSTransport wsTransport;
+ private Session session;
+
+ /**
+ * Create a WebSocket Transport Proxy instance that will pass
+ * along WebSocket event to the underlying protocol level transport.
+ *
+ * @param remoteAddress
+ * the provided remote address to report being connected to.
+ * @param transport
+ * The protocol level WebSocket Transport
+ */
+ public WSTransportProxy(String remoteAddress, Transport transport) {
+ this.remoteAddress = remoteAddress;
+ this.transport = transport;
+ this.wsTransport = transport.narrow(WSTransport.class);
+
+ if (wsTransport == null) {
+ throw new IllegalArgumentException("Provided Transport does not contains a WSTransport implementation");
+ } else {
+ wsTransport.setTransportSink(this);
+ }
+ }
+
+ /**
+ * @return the sub-protocol of the proxied transport.
+ */
+ public String getSubProtocol() {
+ return wsTransport.getSubProtocol();
+ }
+
+ /**
+ * Apply any configure Transport options on the wrapped Transport and its contained
+ * wireFormat instance.
+ */
+ public void setTransportOptions(Map<String, Object> options) {
+ Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
+
+ IntrospectionSupport.setProperties(transport, options);
+ IntrospectionSupport.setProperties(transport.getWireFormat(), wireFormatOptions);
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ if (transport instanceof BrokerServiceAware) {
+ ((BrokerServiceAware) transport).setBrokerService(brokerService);
+ }
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ protocolLock.lock();
+ try {
+ transport.oneway(command);
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
+ }
+ }
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return transport.getPeerCertificates();
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ transport.setPeerCertificates(certificates);
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return remoteAddress;
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return transport.getWireFormat();
+ }
+
+ @Override
+ public int getReceiveCounter() {
+ return transport.getReceiveCounter();
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ transport.stop();
+ if (session != null && session.isOpen()) {
+ session.close();
+ }
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ socketTransportStarted.countDown();
+
+ transport.setTransportListener(getTransportListener());
+ transport.start();
+ }
+
+ //----- WebSocket methods being proxied to the WS Transport --------------//
+
+ @Override
+ public void onWebSocketBinary(byte[] payload, int offset, int length) {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for WebSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ protocolLock.lock();
+ try {
+ wsTransport.onWebSocketBinary(ByteBuffer.wrap(payload, offset, length));
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
+ }
+ }
+
+ @Override
+ public void onWebSocketText(String data) {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for WebSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ protocolLock.lock();
+ try {
+ wsTransport.onWebSocketText(data);
+ } catch (Exception e) {
+ onException(IOExceptionSupport.create(e));
+ } finally {
+ protocolLock.unlock();
+ }
+ }
+
+ @Override
+ public void onWebSocketClose(int statusCode, String reason) {
+ try {
+ if (protocolLock.tryLock() || protocolLock.tryLock(ORDERLY_CLOSE_TIMEOUT, TimeUnit.SECONDS)) {
+ LOG.debug("WebSocket closed: code[{}] message[{}]", statusCode, reason);
+ wsTransport.onWebSocketClosed();
+ }
+ } catch (Exception e) {
+ LOG.debug("Failed to close WebSocket cleanly", e);
+ } finally {
+ if (protocolLock.isHeldByCurrentThread()) {
+ protocolLock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void onWebSocketConnect(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public void onWebSocketError(Throwable cause) {
+ onException(IOExceptionSupport.create(cause));
+ }
+
+ @Override
+ public void onSocketOutboundText(String data) throws IOException {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for WebSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ LOG.trace("WS Proxy sending string of size {} out", data.length());
+ session.getRemote().sendString(data);
+ }
+
+ @Override
+ public void onSocketOutboundBinary(ByteBuffer data) throws IOException {
+ if (!transportStartedAtLeastOnce()) {
+ LOG.debug("Waiting for WebSocket to be properly started...");
+ try {
+ socketTransportStarted.await();
+ } catch (InterruptedException e) {
+ LOG.warn("While waiting for WebSocket to be properly started, we got interrupted!! Should be okay, but you could see race conditions...");
+ }
+ }
+
+ LOG.trace("WS Proxy sending {} bytes out", data.remaining());
+ int limit = data.limit();
+ session.getRemote().sendBytes(data);
+
+ // Reset back to original limit and move position to match limit indicating
+ // that we read everything, the websocket sender clears the passed buffer
+ // which can make it look as if nothing was written.
+ data.limit(limit);
+ data.position(limit);
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private boolean transportStartedAtLeastOnce() {
+ return socketTransportStarted.getCount() == 0;
+ }
+}