You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/30 19:14:23 UTC
[3/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-6339
https://issues.apache.org/jira/browse/AMQ-6339
Add support for AMQP client to connect using WebSockets.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/31c55f75
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/31c55f75
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/31c55f75
Branch: refs/heads/master
Commit: 31c55f75108b06020eb6206c52361b04f49656a9
Parents: 83827f2
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jun 17 16:26:52 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Jun 30 14:52:40 2016 -0400
----------------------------------------------------------------------
activemq-amqp/pom.xml | 45 ++
.../transport/amqp/AmqpProtocolConverter.java | 8 +-
.../transport/amqp/AmqpWSTransport.java | 143 ++++++
.../transport/amqp/AmqpWSTransportFactory.java | 88 ++++
.../transport/amqp/protocol/AmqpConnection.java | 7 +-
.../transport/amqp/sasl/AmqpAuthenticator.java | 4 +-
.../org/apache/activemq/transport/amqp+ws | 17 +
.../org/apache/activemq/transport/amqp+wss | 17 +
.../transport/amqp/AmqpAndMqttTest.java | 137 ++++++
.../transport/amqp/AmqpTestSupport.java | 49 ++
.../transport/amqp/JMSClientSslTest.java | 13 -
.../activemq/transport/amqp/JMSClientTest.java | 6 +-
.../transport/amqp/client/AmqpClient.java | 2 +-
.../amqp/client/AmqpClientTestSupport.java | 36 +-
.../transport/amqp/client/AmqpConnection.java | 10 +-
.../client/transport/NettyTcpTransport.java | 401 ++++++++++++++++
.../amqp/client/transport/NettyTransport.java | 370 +--------------
.../client/transport/NettyTransportFactory.java | 17 +-
.../transport/NettyTransportSslOptions.java | 2 +-
.../amqp/client/transport/NettyWSTransport.java | 470 +++++++++++++++++++
.../AmqpBrokerReuqestedHearbeatsTest.java | 18 +
.../AmqpClientRequestsHeartbeatsTest.java | 18 +
.../amqp/interop/AmqpConnectionsTest.java | 20 +
.../src/test/resources/log4j.properties | 2 +-
.../activemq/transport/vm/VMTransport.java | 17 +
.../apache/activemq/transport/Transport.java | 25 +
.../activemq/transport/TransportFilter.java | 42 +-
.../transport/failover/FailoverTransport.java | 28 +-
.../transport/fanout/FanoutTransport.java | 71 +--
.../activemq/transport/mock/MockTransport.java | 70 ++-
.../activemq/transport/tcp/SslTransport.java | 11 +-
.../activemq/transport/tcp/TcpTransport.java | 19 +-
.../activemq/transport/udp/UdpTransport.java | 25 +-
.../activemq/transport/ws/WSTransport.java | 95 ++++
activemq-http/pom.xml | 2 +-
.../transport/http/BlockingQueueTransport.java | 27 +-
.../transport/http/HttpClientTransport.java | 15 +
.../transport/util/HttpTransportUtils.java | 21 +
.../transport/ws/AbstractStompSocket.java | 8 +-
.../transport/ws/StompWSConnection.java | 11 +-
.../transport/ws/WSTransportFactory.java | 12 +-
.../activemq/transport/ws/WSTransportProxy.java | 270 +++++++++++
.../transport/ws/WSTransportServer.java | 19 +-
.../activemq/transport/ws/jetty9/WSServlet.java | 127 ++++-
.../transport/wss/WSSTransportFactory.java | 12 +-
.../activemq/transport/ws/MQTTWSConnection.java | 10 +-
.../ws/StompWSConnectionTimeoutTest.java | 4 -
.../transport/ws/WSTransportTestSupport.java | 16 +-
.../src/test/resources/log4j.properties | 4 +-
activemq-unit-tests/pom.xml | 4 -
.../activemq/conversions/AmqpAndMqttTest.java | 136 ------
.../activemq/transport/StubTransport.java | 25 +-
.../auto/AutoTransportConfigureTest.java | 30 +-
pom.xml | 2 +-
54 files changed, 2362 insertions(+), 696 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-amqp/pom.xml b/activemq-amqp/pom.xml
index 7c8320f..dba2dac 100644
--- a/activemq-amqp/pom.xml
+++ b/activemq-amqp/pom.xml
@@ -94,6 +94,21 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-http</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-mqtt</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty.websocket</groupId>
+ <artifactId>websocket-server</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<scope>test</scope>
@@ -123,6 +138,36 @@
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty-all-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${netty-all-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-handler</artifactId>
+ <version>${netty-all-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http</artifactId>
+ <version>${netty-all-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport</artifactId>
+ <version>${netty-all-version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
index 6b9a178..9234bf5 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
@@ -22,13 +22,13 @@ import org.apache.activemq.command.Command;
/**
* Interface that defines the API for any AMQP protocol converter ised to
- * map AMQP mechanincs to ActiveMQ and back.
+ * map AMQP mechanics to ActiveMQ and back.
*/
public interface AmqpProtocolConverter {
/**
* A new incoming data packet from the remote peer is handed off to the
- * protocol converter for porcessing. The type can vary and be either an
+ * protocol converter for processing. The type can vary and be either an
* AmqpHeader at the handshake phase or a byte buffer containing the next
* incoming frame data from the remote.
*
@@ -70,9 +70,9 @@ public interface AmqpProtocolConverter {
* empty frames or closing connections due to remote end being inactive
* for to long.
*
- * @returns the amount of milliseconds to wait before performaing another check.
+ * @returns the amount of milliseconds to wait before performing another check.
*
- * @throws IOException if an error occurs on writing heatbeats to the wire.
+ * @throws IOException if an error occurs on writing heart-beats to the wire.
*/
long keepAlive() throws IOException;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
new file mode 100644
index 0000000..2ec3a09
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransport.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.security.cert.X509Certificate;
+
+import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.amqp.AmqpFrameParser.AMQPFrameSink;
+import org.apache.activemq.transport.ws.WSTransport;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * An AMQP based WebSocket transport implementation.
+ */
+public class AmqpWSTransport extends TransportSupport implements WSTransport, AMQPFrameSink {
+
+ private final AmqpFrameParser frameReader = new AmqpFrameParser(this);
+ private final URI remoteLocation;
+
+ private WSTransportSink outputSink;
+ private int receiveCounter;
+ private X509Certificate[] certificates;
+
+ /**
+ * Create a new Transport instance.
+ *
+ * @param location
+ * the remote location where the client connection is from.
+ * @param wireFormat
+ * the WireFormat instance that configures this Transport.
+ */
+ public AmqpWSTransport(URI location, WireFormat wireFormat) {
+ super();
+
+ remoteLocation = location;
+ frameReader.setWireFormat((AmqpWireFormat) wireFormat);
+ }
+
+ @Override
+ public void setTransportSink(WSTransportSink outputSink) {
+ this.outputSink = outputSink;
+ }
+
+ @Override
+ public void oneway(Object command) throws IOException {
+ if (command instanceof ByteBuffer) {
+ outputSink.onSocketOutboundBinary((ByteBuffer) command);
+ } else {
+ throw new IOException("Unexpected output command.");
+ }
+ }
+
+ @Override
+ public String getRemoteAddress() {
+ return remoteLocation.toASCIIString();
+ }
+
+ @Override
+ public int getReceiveCounter() {
+ return receiveCounter;
+ }
+
+ @Override
+ public X509Certificate[] getPeerCertificates() {
+ return certificates;
+ }
+
+ @Override
+ public void setPeerCertificates(X509Certificate[] certificates) {
+ this.certificates = certificates;
+ }
+
+ @Override
+ public String getSubProtocol() {
+ return "amqp";
+ }
+
+ @Override
+ public WireFormat getWireFormat() {
+ return frameReader.getWireFormat();
+ }
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ // Currently nothing needed here since we have no async workers.
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ if (outputSink == null) {
+ throw new IllegalStateException("Transport started before output sink assigned.");
+ }
+
+ // Currently nothing needed here since we have no async workers.
+ }
+
+ //----- WebSocket event hooks --------------------------------------------//
+
+ @Override
+ public void onWebSocketText(String data) throws IOException {
+ onException(new IOException("Illegal text content receive on AMQP WebSocket channel."));
+ }
+
+ @Override
+ public void onWebSocketBinary(ByteBuffer data) throws IOException {
+ try {
+ frameReader.parse(data);
+ } catch (Exception e) {
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ @Override
+ public void onWebSocketClosed() throws IOException {
+ onException(new IOException("Unexpected close of AMQP WebSocket channel."));
+ }
+
+ //----- AMQP Frame Data event hook ---------------------------------------//
+
+ @Override
+ public void onFrame(Object frame) {
+ doConsume(frame);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
new file mode 100644
index 0000000..b85a827
--- /dev/null
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpWSTransportFactory.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.util.Map;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.wireformat.WireFormat;
+
+/**
+ * Factory for creating WebSocket aware AMQP Transports.
+ */
+public class AmqpWSTransportFactory extends TransportFactory implements BrokerServiceAware {
+
+ private BrokerService brokerService = null;
+
+ @Override
+ protected String getDefaultWireFormatType() {
+ return "amqp";
+ }
+
+ @Override
+ public TransportServer doBind(URI location) throws IOException {
+ throw new IOException("doBind() method not implemented! No Server over WS implemented.");
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
+ AmqpTransportFilter amqpTransport = new AmqpTransportFilter(transport, format, brokerService);
+
+ Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
+
+ IntrospectionSupport.setProperties(amqpTransport, options);
+ IntrospectionSupport.setProperties(amqpTransport.getWireFormat(), wireFormatOptions);
+
+ // Now wrap the filter with the monitor
+ transport = createInactivityMonitor(amqpTransport, format);
+ IntrospectionSupport.setProperties(transport, options);
+
+ return super.compositeConfigure(transport, format, options);
+ }
+
+ /**
+ * Factory method to create a new transport
+ *
+ * @throws IOException
+ * @throws UnknownHostException
+ */
+ @Override
+ protected Transport createTransport(URI location, WireFormat wireFormat) throws MalformedURLException, UnknownHostException, IOException {
+ return new AmqpWSTransport(location, wireFormat);
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+
+ protected Transport createInactivityMonitor(AmqpTransportFilter transport, WireFormat format) {
+ AmqpInactivityMonitor monitor = new AmqpInactivityMonitor(transport, format);
+ transport.setInactivityMonitor(monitor);
+ return monitor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
index 8b86132..aa9b577 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java
@@ -309,7 +309,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
while (!done) {
ByteBuffer toWrite = protonTransport.getOutputBuffer();
if (toWrite != null && toWrite.hasRemaining()) {
- LOG.trace("Sending {} bytes out", toWrite.limit());
+ LOG.trace("Server: Sending {} bytes out", toWrite.limit());
amqpTransport.sendToAmqp(toWrite);
protonTransport.outputConsumed();
} else {
@@ -356,6 +356,8 @@ public class AmqpConnection implements AmqpProtocolConverter {
return;
}
+ LOG.trace("Server: Received from client: {} bytes", frame.getLength());
+
while (frame.length > 0) {
try {
int count = protonTransport.input(frame.data, frame.offset, frame.length);
@@ -386,7 +388,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
Event event = null;
while ((event = eventCollector.peek()) != null) {
if (amqpTransport.isTrace()) {
- LOG.trace("Processing event: {}", event.getType());
+ LOG.trace("Server: Processing event: {}", event.getType());
}
switch (event.getType()) {
case CONNECTION_REMOTE_OPEN:
@@ -484,7 +486,6 @@ public class AmqpConnection implements AmqpProtocolConverter {
protonConnection.close();
} else {
-
if (amqpTransport.isUseInactivityMonitor() && amqpWireFormat.getIdleTimeout() > 0) {
LOG.trace("Connection requesting Idle timeout of: {} mills", amqpWireFormat.getIdleTimeout());
protonTransport.setIdleTimeout(amqpWireFormat.getIdleTimeout());
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
index 379c5f9..2309374 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/sasl/AmqpAuthenticator.java
@@ -54,14 +54,14 @@ public class AmqpAuthenticator {
}
/**
- * @return true if the SASL exchange has conpleted, regardless of success.
+ * @return true if the SASL exchange has completed, regardless of success.
*/
public boolean isDone() {
return sasl.getOutcome() != Sasl.SaslOutcome.PN_SASL_NONE;
}
/**
- * @return the list of all SASL mechanisms that are supported curretnly.
+ * @return the list of all SASL mechanisms that are supported currently.
*/
public String[] getSupportedMechanisms() {
return mechanisms;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
new file mode 100644
index 0000000..cfadf68
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ws
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
new file mode 100644
index 0000000..cfadf68
--- /dev/null
+++ b/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+wss
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.transport.amqp.AmqpWSTransportFactory
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
new file mode 100644
index 0000000..ac7a199
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpAndMqttTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AmqpAndMqttTest {
+
+ protected BrokerService broker;
+ private TransportConnector amqpConnector;
+ private TransportConnector mqttConnector;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ }
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ broker.setUseJmx(false);
+ broker.setAdvisorySupport(false);
+ broker.setSchedulerSupport(false);
+
+ amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
+ mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
+
+ return broker;
+ }
+
+ @Test(timeout = 60000)
+ public void testFromMqttToAmqp() throws Exception {
+ Connection amqp = createAmqpConnection();
+ Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
+
+ final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
+ mqtt.connect();
+ byte[] payload = bytes("Hello World");
+ mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
+ mqtt.disconnect();
+
+ Message msg = consumer.receive(1000 * 5);
+ assertNotNull(msg);
+ assertTrue(msg instanceof BytesMessage);
+
+ BytesMessage bmsg = (BytesMessage) msg;
+ byte[] actual = new byte[(int) bmsg.getBodyLength()];
+ bmsg.readBytes(actual);
+ assertTrue(Arrays.equals(actual, payload));
+ amqp.close();
+ }
+
+ private byte[] bytes(String value) {
+ try {
+ return value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected MQTT createMQTTConnection() throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setReconnectAttemptsMax(0);
+ mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
+ return mqtt;
+ }
+
+ public Connection createAmqpConnection() throws Exception {
+
+ String amqpURI = "amqp://localhost:" + amqpConnector.getConnectUri().getPort();
+
+ final JmsConnectionFactory factory = new JmsConnectionFactory(amqpURI);
+
+ factory.setUsername("admin");
+ factory.setPassword("password");
+
+ final Connection connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+
+ connection.start();
+ return connection;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
index af6a63f..12c0a17 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp;
import java.io.File;
import java.io.IOException;
+import java.net.ServerSocket;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Set;
@@ -33,6 +34,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.net.ServerSocketFactory;
import javax.net.ssl.KeyManager;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
@@ -79,6 +81,11 @@ public class AmqpTestSupport {
protected URI amqpNioPlusSslURI;
protected int amqpNioPlusSslPort;
+ protected URI amqpWsURI;
+ protected int amqpWsPort;
+ protected URI amqpWssURI;
+ protected int amqpWssPort;
+
protected URI autoURI;
protected int autoPort;
protected URI autoSslURI;
@@ -213,6 +220,20 @@ public class AmqpTestSupport {
autoNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using auto+nio+ssl port " + autoNioPlusSslPort);
}
+ if (isUseWsConnector()) {
+ connector = brokerService.addConnector(
+ "ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
+ amqpWsPort = connector.getConnectUri().getPort();
+ amqpWsURI = connector.getPublishableConnectURI();
+ LOG.debug("Using amqp+ws port " + amqpWsPort);
+ }
+ if (isUseWssConnector()) {
+ connector = brokerService.addConnector(
+ "wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
+ amqpWssPort = connector.getConnectUri().getPort();
+ amqpWssURI = connector.getPublishableConnectURI();
+ LOG.debug("Using amqp+wss port " + amqpWssPort);
+ }
}
protected boolean isPersistent() {
@@ -263,6 +284,14 @@ public class AmqpTestSupport {
return false;
}
+ protected boolean isUseWsConnector() {
+ return false;
+ }
+
+ protected boolean isUseWssConnector() {
+ return false;
+ }
+
protected String getAmqpTransformer() {
return "jms";
}
@@ -355,6 +384,26 @@ public class AmqpTestSupport {
return name.getMethodName();
}
+ protected int getProxyPort(int proxyPort) {
+ if (proxyPort == 0) {
+ ServerSocket ss = null;
+ try {
+ ss = ServerSocketFactory.getDefault().createServerSocket(0);
+ proxyPort = ss.getLocalPort();
+ } catch (IOException e) { // ignore
+ } finally {
+ try {
+ if (ss != null ) {
+ ss.close();
+ }
+ } catch (IOException e) { // ignore
+ }
+ }
+ }
+
+ return proxyPort;
+ }
+
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
ObjectName brokerViewMBean = new ObjectName(
"org.apache.activemq:type=Broker,brokerName=localhost");
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
index a8acb7d..b0bd3a7 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientSslTest.java
@@ -17,13 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.net.URI;
-import java.security.SecureRandom;
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManager;
-
-import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,13 +27,6 @@ import org.slf4j.LoggerFactory;
public class JMSClientSslTest extends JMSClientTest {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientSslTest.class);
- @BeforeClass
- public static void beforeClass() throws Exception {
- SSLContext ctx = SSLContext.getInstance("TLS");
- ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());
- SSLContext.setDefault(ctx);
- }
-
@Override
protected URI getBrokerURI() {
return amqpSslURI;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
index 647ff13..87dc9ee 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
@@ -57,7 +57,7 @@ import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
-import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
@@ -1180,8 +1180,8 @@ public class JMSClientTest extends JMSClientTestSupport {
@Test(timeout = 60000)
public void testZeroPrefetchWithTwoConsumers() throws Exception {
- connection = createConnection();
- ((JmsConnection)connection).getPrefetchPolicy().setAll(0);
+ JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=0"));
+ connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
index 738b0eb..78b1aa0 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClient.java
@@ -21,8 +21,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory;
+import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.qpid.proton.amqp.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
index 5504954..123f86c 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpClientTestSupport.java
@@ -51,12 +51,12 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
@Override
protected boolean isUseTcpConnector() {
- return !isUseSSL() && !connectorScheme.contains("nio");
+ return !isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("ws");
}
@Override
protected boolean isUseSslConnector() {
- return isUseSSL() && !connectorScheme.contains("nio");
+ return isUseSSL() && !connectorScheme.contains("nio") && !connectorScheme.contains("wss");
}
@Override
@@ -69,13 +69,33 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
return isUseSSL() && connectorScheme.contains("nio");
}
+ @Override
+ protected boolean isUseWsConnector() {
+ return !isUseSSL() && connectorScheme.contains("ws");
+ }
+
+ @Override
+ protected boolean isUseWssConnector() {
+ return isUseSSL() && connectorScheme.contains("wss");
+ }
+
public URI getBrokerAmqpConnectionURI() {
+ boolean webSocket = false;
+
try {
int port = 0;
switch (connectorScheme) {
case "amqp":
port = this.amqpPort;
break;
+ case "amqp+ws":
+ port = this.amqpWsPort;
+ webSocket = true;
+ break;
+ case "amqp+wss":
+ port = this.amqpWssPort;
+ webSocket = true;
+ break;
case "amqp+ssl":
port = this.amqpSslPort;
break;
@@ -92,9 +112,17 @@ public class AmqpClientTestSupport extends AmqpTestSupport {
String uri = null;
if (isUseSSL()) {
- uri = "ssl://127.0.0.1:" + port;
+ if (webSocket) {
+ uri = "wss://127.0.0.1:" + port;
+ } else {
+ uri = "ssl://127.0.0.1:" + port;
+ }
} else {
- uri = "tcp://127.0.0.1:" + port;
+ if (webSocket) {
+ uri = "ws://127.0.0.1:" + port;
+ } else {
+ uri = "tcp://127.0.0.1:" + port;
+ }
}
if (!getAmqpConnectionURIOptions().isEmpty()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
index acda553..85a1d22 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java
@@ -34,7 +34,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.transport.InactivityIOException;
import org.apache.activemq.transport.amqp.client.sasl.SaslAuthenticator;
-import org.apache.activemq.transport.amqp.client.transport.NettyTransport;
import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener;
import org.apache.activemq.transport.amqp.client.util.AsyncResult;
import org.apache.activemq.transport.amqp.client.util.ClientFuture;
@@ -79,7 +78,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong txIdGenerator = new AtomicLong();
private final Collector protonCollector = new CollectorImpl();
- private final NettyTransport transport;
+ private final org.apache.activemq.transport.amqp.client.transport.NettyTransport transport;
private final Transport protonTransport = Transport.Factory.create();
private final String username;
@@ -103,7 +102,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
private long closeTimeout = DEFAULT_CLOSE_TIMEOUT;
private long drainTimeout = DEFAULT_DRAIN_TIMEOUT;
- public AmqpConnection(NettyTransport transport, String username, String password) {
+ public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, String password) {
setEndpoint(Connection.Factory.create());
getEndpoint().collect(protonCollector);
@@ -490,7 +489,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
@Override
public void run() {
ByteBuffer source = incoming.nioBuffer();
- LOG.trace("Received from Broker {} bytes:", source.remaining());
+ LOG.trace("Client Received from Broker {} bytes:", source.remaining());
if (protonTransport.isClosed()) {
LOG.debug("Ignoring incoming data because transport is closed");
@@ -520,6 +519,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
@Override
public void onTransportClosed() {
LOG.debug("The transport has unexpectedly closed");
+ failed(getOpenAbortException());
}
@Override
@@ -612,7 +612,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
Event protonEvent = null;
while ((protonEvent = protonCollector.peek()) != null) {
if (!protonEvent.getType().equals(Type.TRANSPORT)) {
- LOG.trace("New Proton Event: {}", protonEvent.getType());
+ LOG.trace("Client: New Proton Event: {}", protonEvent.getType());
}
AmqpEventSink amqpEventSink = null;
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
new file mode 100644
index 0000000..886ed4b
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTcpTransport.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.client.transport;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.FixedRecvByteBufAllocator;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
+/**
+ * TCP based transport that uses Netty as the underlying IO layer.
+ */
+public class NettyTcpTransport implements NettyTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransport.class);
+
+ private static final int QUIET_PERIOD = 20;
+ private static final int SHUTDOWN_TIMEOUT = 100;
+
+ protected Bootstrap bootstrap;
+ protected EventLoopGroup group;
+ protected Channel channel;
+ protected NettyTransportListener listener;
+ protected NettyTransportOptions options;
+ protected final URI remote;
+ protected boolean secure;
+
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final CountDownLatch connectLatch = new CountDownLatch(1);
+ private IOException failureCause;
+ private Throwable pendingFailure;
+
+ /**
+ * Create a new transport instance
+ *
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(URI remoteLocation, NettyTransportOptions options) {
+ this(null, remoteLocation, options);
+ }
+
+ /**
+ * Create a new transport instance
+ *
+ * @param listener
+ * the TransportListener that will receive events from this Transport.
+ * @param remoteLocation
+ * the URI that defines the remote resource to connect to.
+ * @param options
+ * the transport options used to configure the socket connection.
+ */
+ public NettyTcpTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
+ this.options = options;
+ this.listener = listener;
+ this.remote = remoteLocation;
+ this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
+ }
+
+ @Override
+ public void connect() throws IOException {
+
+ if (listener == null) {
+ throw new IllegalStateException("A transport listener must be set before connection attempts.");
+ }
+
+ group = new NioEventLoopGroup(1);
+
+ bootstrap = new Bootstrap();
+ bootstrap.group(group);
+ bootstrap.channel(NioSocketChannel.class);
+ bootstrap.handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ public void initChannel(Channel connectedChannel) throws Exception {
+ configureChannel(connectedChannel);
+ }
+ });
+
+ configureNetty(bootstrap, getTransportOptions());
+
+ ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
+ future.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ handleConnected(future.channel());
+ } else if (future.isCancelled()) {
+ connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
+ } else {
+ connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ try {
+ connectLatch.await();
+ } catch (InterruptedException ex) {
+ LOG.debug("Transport connection was interrupted.");
+ Thread.interrupted();
+ failureCause = IOExceptionSupport.create(ex);
+ }
+
+ if (failureCause != null) {
+ // Close out any Netty resources now as they are no longer needed.
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ channel = null;
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ group = null;
+ }
+
+ throw failureCause;
+ } else {
+ // Connected, allow any held async error to fire now and close the transport.
+ channel.eventLoop().execute(new Runnable() {
+
+ @Override
+ public void run() {
+ if (pendingFailure != null) {
+ channel.pipeline().fireExceptionCaught(pendingFailure);
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public boolean isConnected() {
+ return connected.get();
+ }
+
+ @Override
+ public boolean isSSL() {
+ return secure;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed.compareAndSet(false, true)) {
+ connected.set(false);
+ if (channel != null) {
+ channel.close().syncUninterruptibly();
+ }
+ if (group != null) {
+ group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ @Override
+ public ByteBuf allocateSendBuffer(int size) throws IOException {
+ checkConnected();
+ return channel.alloc().ioBuffer(size, size);
+ }
+
+ @Override
+ public void send(ByteBuf output) throws IOException {
+ checkConnected();
+ int length = output.readableBytes();
+ if (length == 0) {
+ return;
+ }
+
+ LOG.trace("Attempted write of: {} bytes", length);
+
+ channel.writeAndFlush(output);
+ }
+
+ @Override
+ public NettyTransportListener getTransportListener() {
+ return listener;
+ }
+
+ @Override
+ public void setTransportListener(NettyTransportListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public NettyTransportOptions getTransportOptions() {
+ if (options == null) {
+ if (isSSL()) {
+ options = NettyTransportSslOptions.INSTANCE;
+ } else {
+ options = NettyTransportOptions.INSTANCE;
+ }
+ }
+
+ return options;
+ }
+
+ @Override
+ public URI getRemoteLocation() {
+ return remote;
+ }
+
+ @Override
+ public Principal getLocalPrincipal() {
+ if (!isSSL()) {
+ throw new UnsupportedOperationException("Not connected to a secure channel");
+ }
+
+ SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
+
+ return sslHandler.engine().getSession().getLocalPrincipal();
+ }
+
+ //----- Internal implementation details, can be overridden as needed --//
+
+ protected String getRemoteHost() {
+ return remote.getHost();
+ }
+
+ protected int getRemotePort() {
+ int port = remote.getPort();
+
+ if (port <= 0) {
+ if (isSSL()) {
+ port = getSslOptions().getDefaultSslPort();
+ } else {
+ port = getTransportOptions().getDefaultTcpPort();
+ }
+ }
+
+ return port;
+ }
+
+ protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
+ bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
+ bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
+ bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
+
+ if (options.getSendBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
+ }
+
+ if (options.getReceiveBufferSize() != -1) {
+ bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
+ bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
+ }
+
+ if (options.getTrafficClass() != -1) {
+ bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
+ }
+ }
+
+ protected void configureChannel(final Channel channel) throws Exception {
+ if (isSSL()) {
+ SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
+ sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ LOG.trace("SSL Handshake has completed: {}", channel);
+ connectionEstablished(channel);
+ } else {
+ LOG.trace("SSL Handshake has failed: {}", channel);
+ connectionFailed(channel, IOExceptionSupport.create(future.cause()));
+ }
+ }
+ });
+
+ channel.pipeline().addLast(sslHandler);
+ }
+
+ channel.pipeline().addLast(new NettyTcpTransportHandler());
+ }
+
+ protected void handleConnected(final Channel channel) throws Exception {
+ if (!isSSL()) {
+ connectionEstablished(channel);
+ }
+ }
+
+ //----- State change handlers and checks ---------------------------------//
+
+ /**
+ * Called when the transport has successfully connected and is ready for use.
+ */
+ protected void connectionEstablished(Channel connectedChannel) {
+ channel = connectedChannel;
+ connected.set(true);
+ connectLatch.countDown();
+ }
+
+ /**
+ * Called when the transport connection failed and an error should be returned.
+ *
+ * @param failedChannel
+ * The Channel instance that failed.
+ * @param cause
+ * An IOException that describes the cause of the failed connection.
+ */
+ protected void connectionFailed(Channel failedChannel, IOException cause) {
+ failureCause = IOExceptionSupport.create(cause);
+ channel = failedChannel;
+ connected.set(false);
+ connectLatch.countDown();
+ }
+
+ private NettyTransportSslOptions getSslOptions() {
+ return (NettyTransportSslOptions) getTransportOptions();
+ }
+
+ private void checkConnected() throws IOException {
+ if (!connected.get()) {
+ throw new IOException("Cannot send to a non-connected transport.");
+ }
+ }
+
+ //----- Handle connection events -----------------------------------------//
+
+ private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ @Override
+ public void channelActive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has become active! Channel is {}", context.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext context) throws Exception {
+ LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportClosed listener");
+ listener.onTransportClosed();
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
+ LOG.trace("Exception on channel! Channel is {}", context.channel());
+ if (connected.compareAndSet(true, false) && !closed.get()) {
+ LOG.trace("Firing onTransportError listener");
+ if (pendingFailure != null) {
+ listener.onTransportError(pendingFailure);
+ } else {
+ listener.onTransportError(cause);
+ }
+ } else {
+ // Hold the first failure for later dispatch if connect succeeds.
+ // This will then trigger disconnect using the first error reported.
+ if (pendingFailure != null) {
+ LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
+ pendingFailure = cause;
+ }
+ }
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
+ LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
+ listener.onData(buffer);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
index 4084780..48be3a2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -16,375 +16,37 @@
*/
package org.apache.activemq.transport.amqp.client.transport;
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.FixedRecvByteBufAllocator;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.activemq.transport.amqp.client.util.IOExceptionSupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBuf;
/**
- * TCP based transport that uses Netty as the underlying IO layer.
+ *
*/
-public class NettyTransport {
-
- private static final Logger LOG = LoggerFactory.getLogger(NettyTransport.class);
-
- private static final int QUIET_PERIOD = 20;
- private static final int SHUTDOWN_TIMEOUT = 100;
-
- protected Bootstrap bootstrap;
- protected EventLoopGroup group;
- protected Channel channel;
- protected NettyTransportListener listener;
- protected NettyTransportOptions options;
- protected final URI remote;
- protected boolean secure;
-
- private final AtomicBoolean connected = new AtomicBoolean();
- private final AtomicBoolean closed = new AtomicBoolean();
- private final CountDownLatch connectLatch = new CountDownLatch(1);
- private IOException failureCause;
- private Throwable pendingFailure;
-
- /**
- * Create a new transport instance
- *
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyTransport(URI remoteLocation, NettyTransportOptions options) {
- this(null, remoteLocation, options);
- }
-
- /**
- * Create a new transport instance
- *
- * @param listener
- * the TransportListener that will receive events from this Transport.
- * @param remoteLocation
- * the URI that defines the remote resource to connect to.
- * @param options
- * the transport options used to configure the socket connection.
- */
- public NettyTransport(NettyTransportListener listener, URI remoteLocation, NettyTransportOptions options) {
- this.options = options;
- this.listener = listener;
- this.remote = remoteLocation;
- this.secure = remoteLocation.getScheme().equalsIgnoreCase("ssl");
- }
-
- public void connect() throws IOException {
-
- if (listener == null) {
- throw new IllegalStateException("A transport listener must be set before connection attempts.");
- }
-
- group = new NioEventLoopGroup(1);
-
- bootstrap = new Bootstrap();
- bootstrap.group(group);
- bootstrap.channel(NioSocketChannel.class);
- bootstrap.handler(new ChannelInitializer<Channel>() {
-
- @Override
- public void initChannel(Channel connectedChannel) throws Exception {
- configureChannel(connectedChannel);
- }
- });
-
- configureNetty(bootstrap, getTransportOptions());
-
- ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
- future.addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- handleConnected(future.channel());
- } else if (future.isCancelled()) {
- connectionFailed(future.channel(), new IOException("Connection attempt was cancelled"));
- } else {
- connectionFailed(future.channel(), IOExceptionSupport.create(future.cause()));
- }
- }
- });
-
- try {
- connectLatch.await();
- } catch (InterruptedException ex) {
- LOG.debug("Transport connection was interrupted.");
- Thread.interrupted();
- failureCause = IOExceptionSupport.create(ex);
- }
-
- if (failureCause != null) {
- // Close out any Netty resources now as they are no longer needed.
- if (channel != null) {
- channel.close().syncUninterruptibly();
- channel = null;
- }
- if (group != null) {
- group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- group = null;
- }
-
- throw failureCause;
- } else {
- // Connected, allow any held async error to fire now and close the transport.
- channel.eventLoop().execute(new Runnable() {
-
- @Override
- public void run() {
- if (pendingFailure != null) {
- channel.pipeline().fireExceptionCaught(pendingFailure);
- }
- }
- });
- }
- }
-
- public boolean isConnected() {
- return connected.get();
- }
-
- public boolean isSSL() {
- return secure;
- }
-
- public void close() throws IOException {
- if (closed.compareAndSet(false, true)) {
- connected.set(false);
- if (channel != null) {
- channel.close().syncUninterruptibly();
- }
- if (group != null) {
- group.shutdownGracefully(QUIET_PERIOD, SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);
- }
- }
- }
-
- public ByteBuf allocateSendBuffer(int size) throws IOException {
- checkConnected();
- return channel.alloc().ioBuffer(size, size);
- }
-
- public void send(ByteBuf output) throws IOException {
- checkConnected();
- int length = output.readableBytes();
- if (length == 0) {
- return;
- }
-
- LOG.trace("Attempted write of: {} bytes", length);
-
- channel.writeAndFlush(output);
- }
-
- public NettyTransportListener getTransportListener() {
- return listener;
- }
-
- public void setTransportListener(NettyTransportListener listener) {
- this.listener = listener;
- }
-
- public NettyTransportOptions getTransportOptions() {
- if (options == null) {
- if (isSSL()) {
- options = NettyTransportSslOptions.INSTANCE;
- } else {
- options = NettyTransportOptions.INSTANCE;
- }
- }
-
- return options;
- }
-
- public URI getRemoteLocation() {
- return remote;
- }
-
- public Principal getLocalPrincipal() {
- if (!isSSL()) {
- throw new UnsupportedOperationException("Not connected to a secure channel");
- }
-
- SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
-
- return sslHandler.engine().getSession().getLocalPrincipal();
- }
-
- //----- Internal implementation details, can be overridden as needed --//
-
- protected String getRemoteHost() {
- return remote.getHost();
- }
-
- protected int getRemotePort() {
- int port = remote.getPort();
-
- if (port <= 0) {
- if (isSSL()) {
- port = getSslOptions().getDefaultSslPort();
- } else {
- port = getTransportOptions().getDefaultTcpPort();
- }
- }
-
- return port;
- }
-
- protected void configureNetty(Bootstrap bootstrap, NettyTransportOptions options) {
- bootstrap.option(ChannelOption.TCP_NODELAY, options.isTcpNoDelay());
- bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, options.getConnectTimeout());
- bootstrap.option(ChannelOption.SO_KEEPALIVE, options.isTcpKeepAlive());
- bootstrap.option(ChannelOption.SO_LINGER, options.getSoLinger());
- bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE);
-
- if (options.getSendBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_SNDBUF, options.getSendBufferSize());
- }
-
- if (options.getReceiveBufferSize() != -1) {
- bootstrap.option(ChannelOption.SO_RCVBUF, options.getReceiveBufferSize());
- bootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(options.getReceiveBufferSize()));
- }
-
- if (options.getTrafficClass() != -1) {
- bootstrap.option(ChannelOption.IP_TOS, options.getTrafficClass());
- }
- }
-
- protected void configureChannel(final Channel channel) throws Exception {
- if (isSSL()) {
- SslHandler sslHandler = NettyTransportSupport.createSslHandler(getRemoteLocation(), getSslOptions());
- sslHandler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>() {
- @Override
- public void operationComplete(Future<Channel> future) throws Exception {
- if (future.isSuccess()) {
- LOG.trace("SSL Handshake has completed: {}", channel);
- connectionEstablished(channel);
- } else {
- LOG.trace("SSL Handshake has failed: {}", channel);
- connectionFailed(channel, IOExceptionSupport.create(future.cause()));
- }
- }
- });
-
- channel.pipeline().addLast(sslHandler);
- }
-
- channel.pipeline().addLast(new NettyTcpTransportHandler());
- }
+public interface NettyTransport {
- protected void handleConnected(final Channel channel) throws Exception {
- if (!isSSL()) {
- connectionEstablished(channel);
- }
- }
+ void connect() throws IOException;
- //----- State change handlers and checks ---------------------------------//
+ boolean isConnected();
- /**
- * Called when the transport has successfully connected and is ready for use.
- */
- protected void connectionEstablished(Channel connectedChannel) {
- channel = connectedChannel;
- connected.set(true);
- connectLatch.countDown();
- }
+ boolean isSSL();
- /**
- * Called when the transport connection failed and an error should be returned.
- *
- * @param failedChannel
- * The Channel instance that failed.
- * @param cause
- * An IOException that describes the cause of the failed connection.
- */
- protected void connectionFailed(Channel failedChannel, IOException cause) {
- failureCause = IOExceptionSupport.create(cause);
- channel = failedChannel;
- connected.set(false);
- connectLatch.countDown();
- }
+ void close() throws IOException;
- private NettyTransportSslOptions getSslOptions() {
- return (NettyTransportSslOptions) getTransportOptions();
- }
+ ByteBuf allocateSendBuffer(int size) throws IOException;
- private void checkConnected() throws IOException {
- if (!connected.get()) {
- throw new IOException("Cannot send to a non-connected transport.");
- }
- }
+ void send(ByteBuf output) throws IOException;
- //----- Handle connection events -----------------------------------------//
+ NettyTransportListener getTransportListener();
- private class NettyTcpTransportHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ void setTransportListener(NettyTransportListener listener);
- @Override
- public void channelActive(ChannelHandlerContext context) throws Exception {
- LOG.trace("Channel has become active! Channel is {}", context.channel());
- }
+ NettyTransportOptions getTransportOptions();
- @Override
- public void channelInactive(ChannelHandlerContext context) throws Exception {
- LOG.trace("Channel has gone inactive! Channel is {}", context.channel());
- if (connected.compareAndSet(true, false) && !closed.get()) {
- LOG.trace("Firing onTransportClosed listener");
- listener.onTransportClosed();
- }
- }
+ URI getRemoteLocation();
- @Override
- public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception {
- LOG.trace("Exception on channel! Channel is {}", context.channel());
- if (connected.compareAndSet(true, false) && !closed.get()) {
- LOG.trace("Firing onTransportError listener");
- if (pendingFailure != null) {
- listener.onTransportError(pendingFailure);
- } else {
- listener.onTransportError(cause);
- }
- } else {
- // Hold the first failure for later dispatch if connect succeeds.
- // This will then trigger disconnect using the first error reported.
- if (pendingFailure != null) {
- LOG.trace("Holding error until connect succeeds: {}", cause.getMessage());
- pendingFailure = cause;
- }
- }
- }
+ Principal getLocalPrincipal();
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
- LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
- listener.onData(buffer);
- }
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
index fd50890..cc47aa2 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportFactory.java
@@ -46,7 +46,7 @@ public final class NettyTransportFactory {
remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
- if (!remoteURI.getScheme().equalsIgnoreCase("ssl")) {
+ if (!remoteURI.getScheme().equalsIgnoreCase("ssl") && !remoteURI.getScheme().equalsIgnoreCase("wss")) {
transportOptions = NettyTransportOptions.INSTANCE.clone();
} else {
transportOptions = NettyTransportSslOptions.INSTANCE.clone();
@@ -61,7 +61,20 @@ public final class NettyTransportFactory {
throw new IllegalArgumentException(msg);
}
- NettyTransport result = new NettyTransport(remoteURI, transportOptions);
+ NettyTransport result = null;
+
+ switch (remoteURI.getScheme().toLowerCase()) {
+ case "tcp":
+ case "ssl":
+ result = new NettyTcpTransport(remoteURI, transportOptions);
+ break;
+ case "ws":
+ case "wss":
+ result = new NettyWSTransport(remoteURI, transportOptions);
+ break;
+ default:
+ throw new IllegalArgumentException("Invalid URI Scheme: " + remoteURI.getScheme());
+ }
return result;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/31c55f75/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
index 92ffd3c..b01f884 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/transport/NettyTransportSslOptions.java
@@ -30,7 +30,7 @@ public class NettyTransportSslOptions extends NettyTransportOptions {
public static final String DEFAULT_STORE_TYPE = "jks";
public static final String DEFAULT_CONTEXT_PROTOCOL = "TLS";
public static final boolean DEFAULT_TRUST_ALL = false;
- public static final boolean DEFAULT_VERIFY_HOST = true;
+ public static final boolean DEFAULT_VERIFY_HOST = false;
public static final List<String> DEFAULT_DISABLED_PROTOCOLS = Collections.unmodifiableList(Arrays.asList(new String[]{"SSLv2Hello", "SSLv3"}));
public static final int DEFAULT_SSL_PORT = 5671;