You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/11 22:03:13 UTC
[2/3] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5889
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
new file mode 100644
index 0000000..e218180
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+
+/**
+ *
+ *
+ */
+public class OpenWireProtocolVerifier implements ProtocolVerifier {
+
+ protected final OpenWireFormatFactory wireFormatFactory;
+
+ public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) {
+ this.wireFormatFactory = wireFormatFactory;
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+ */
+ @Override
+ public boolean isProtocol(byte[] value) {
+ if (value.length < 8) {
+ throw new IllegalArgumentException("Protocol header length changed "
+ + value.length);
+ }
+
+ int start = !((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() ? 4 : 0;
+ int j = 0;
+ // type
+ if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) {
+ return false;
+ }
+ start++;
+ WireFormatInfo info = new WireFormatInfo();
+ final byte[] magic = info.getMagic();
+ int remainingLen = value.length - start;
+ int useLen = remainingLen > magic.length ? magic.length : remainingLen;
+ useLen += start;
+ // magic
+ for (int i = start; i < useLen; i++) {
+ if (value[i] != magic[j]) {
+ return false;
+ }
+ j++;
+ }
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
new file mode 100644
index 0000000..c5755c7
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
@@ -0,0 +1,9 @@
+package org.apache.activemq.broker.transport.protocol;
+
+
+public interface ProtocolVerifier {
+
+ public boolean isProtocol(byte[] value);
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
new file mode 100644
index 0000000..f18f1c7
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ *
+ *
+ */
+public class StompProtocolVerifier implements ProtocolVerifier {
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+ */
+ @Override
+ public boolean isProtocol(byte[] value) {
+ String frameStart = new String(value, StandardCharsets.US_ASCII);
+ return frameStart.startsWith("CONNECT") || frameStart.startsWith("STOMP");
+ }
+
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
new file mode 100644
index 0000000..f922e98
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.nio;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This transport initializes the SSLEngine and reads the first command before
+ * handing off to the detected transport.
+ *
+ */
+public class AutoInitNioSSLTransport extends NIOSSLTransport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AutoInitNioSSLTransport.class);
+
+ public AutoInitNioSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+ super(wireFormat, socketFactory, remoteLocation, localLocation);
+ }
+
+ public AutoInitNioSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ super(wireFormat, socket, null, null, null);
+ }
+
+ @Override
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public ByteBuffer getInputBuffer() {
+ return this.inputBuffer;
+ }
+
+ @Override
+ protected void initializeStreams() throws IOException {
+ NIOOutputStream outputStream = null;
+ try {
+ channel = socket.getChannel();
+ channel.configureBlocking(false);
+
+ if (sslContext == null) {
+ sslContext = SSLContext.getDefault();
+ }
+
+ String remoteHost = null;
+ int remotePort = -1;
+
+ try {
+ URI remoteAddress = new URI(this.getRemoteAddress());
+ remoteHost = remoteAddress.getHost();
+ remotePort = remoteAddress.getPort();
+ } catch (Exception e) {
+ }
+
+ // initialize engine, the initial sslSession we get will need to be
+ // updated once the ssl handshake process is completed.
+ if (remoteHost != null && remotePort != -1) {
+ sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
+ } else {
+ sslEngine = sslContext.createSSLEngine();
+ }
+
+ sslEngine.setUseClientMode(false);
+ if (enabledCipherSuites != null) {
+ sslEngine.setEnabledCipherSuites(enabledCipherSuites);
+ }
+
+ if (enabledProtocols != null) {
+ sslEngine.setEnabledProtocols(enabledProtocols);
+ }
+
+ if (wantClientAuth) {
+ sslEngine.setWantClientAuth(wantClientAuth);
+ }
+
+ if (needClientAuth) {
+ sslEngine.setNeedClientAuth(needClientAuth);
+ }
+
+ sslSession = sslEngine.getSession();
+
+ inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+ inputBuffer.clear();
+
+ outputStream = new NIOOutputStream(channel);
+ outputStream.setEngine(sslEngine);
+ this.dataOut = new DataOutputStream(outputStream);
+ this.buffOut = outputStream;
+ sslEngine.beginHandshake();
+ handshakeStatus = sslEngine.getHandshakeStatus();
+ doHandshake();
+ // detectReadyState();
+ } catch (Exception e) {
+ try {
+ if(outputStream != null) {
+ outputStream.close();
+ }
+ super.closeStreams();
+ } catch (Exception ex) {}
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void doOpenWireInit() throws Exception {
+
+ }
+
+
+ @Override
+ protected void finishHandshake() throws Exception {
+ if (handshakeInProgress) {
+ handshakeInProgress = false;
+ nextFrameSize = -1;
+
+ // Once handshake completes we need to ask for the now real sslSession
+ // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
+ // cipher suite.
+ sslSession = sslEngine.getSession();
+
+ }
+ }
+
+ public SSLEngine getSslSession() {
+ return this.sslEngine;
+ }
+
+ public volatile byte[] read;
+ public volatile int readSize;
+
+ @Override
+ public void serviceRead() {
+ try {
+ if (handshakeInProgress) {
+ doHandshake();
+ }
+
+ ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
+ plain.position(plain.limit());
+
+ while (true) {
+ if (!plain.hasRemaining()) {
+
+ int readCount = secureRead(plain);
+
+ if (readCount == 0) {
+ break;
+ }
+
+ // channel is closed, cleanup
+ if (readCount == -1) {
+ onException(new EOFException());
+ selection.close();
+ break;
+ }
+
+ receiveCounter += readCount;
+ }
+
+ if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
+ processCommand(plain);
+ //Break when command is found
+ break;
+ }
+ }
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
+ }
+
+ @Override
+ protected void processCommand(ByteBuffer plain) throws Exception {
+ read = plain.array();
+ readSize = receiveCounter;
+ }
+
+
+ @Override
+ public void doStart() throws Exception {
+ taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
+ // no need to init as we can delay that until demand (eg in doHandshake)
+ connect();
+ //super.doStart();
+ }
+
+
+ @Override
+ protected void doStop(ServiceStopper stopper) throws Exception {
+ if (taskRunnerFactory != null) {
+ taskRunnerFactory.shutdownNow();
+ taskRunnerFactory = null;
+ }
+// if (selection != null) {
+// selection.close();
+// selection = null;
+// }
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
new file mode 100644
index 0000000..d6791eb
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
new file mode 100644
index 0000000..a03cc27
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
new file mode 100644
index 0000000..74a08c1
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
new file mode 100644
index 0000000..d6e626e
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-camel/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/pom.xml b/activemq-camel/pom.xml
index 1f19539..11f9610 100755
--- a/activemq-camel/pom.xml
+++ b/activemq-camel/pom.xml
@@ -254,7 +254,27 @@
</plugins>
</build>
</profile>
-
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml
index 3fb3ddd..81531cc 100755
--- a/activemq-client/pom.xml
+++ b/activemq-client/pom.xml
@@ -345,5 +345,26 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index a25737f..10eebd3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq;
+import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.AccessController;
@@ -314,7 +315,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
*/
protected Transport createTransport() throws JMSException {
try {
- return TransportFactory.connect(brokerURL);
+ URI connectBrokerUL = brokerURL;
+ String scheme = brokerURL.getScheme();
+ if (scheme == null) {
+ throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
+ }
+ if (scheme.equals("auto")) {
+ connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
+ } else if (scheme.equals("auto+ssl")) {
+ connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
+ } else if (scheme.equals("auto+nio")) {
+ connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
+ } else if (scheme.equals("auto+nio+ssl")) {
+ connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
+ }
+
+ return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
index 38b1ae3..327ea42 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
@@ -18,15 +18,19 @@ package org.apache.activemq.transport;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
+import javax.net.ssl.SSLEngine;
+
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@@ -119,6 +123,9 @@ public abstract class TransportFactory {
WireFormat wf = createWireFormat(options);
Transport transport = createTransport(location, wf);
Transport rc = configure(transport, wf, options);
+ //remove auto
+ IntrospectionSupport.extractProperties(options, "auto.");
+
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 6b8a446..eeb68d8 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -69,16 +69,26 @@ public class NIOSSLTransport extends NIOTransport {
super(wireFormat, socketFactory, remoteLocation, localLocation);
}
- public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
- super(wireFormat, socket);
+ public NIOSSLTransport(WireFormat wireFormat, Socket socket, SSLEngine engine, InitBuffer initBuffer,
+ ByteBuffer inputBuffer) throws IOException {
+ super(wireFormat, socket, initBuffer);
+ this.sslEngine = engine;
+ if (engine != null)
+ this.sslSession = engine.getSession();
+ this.inputBuffer = inputBuffer;
}
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
+ volatile boolean hasSslEngine = false;
+
@Override
protected void initializeStreams() throws IOException {
+ if (sslEngine != null) {
+ hasSslEngine = true;
+ }
NIOOutputStream outputStream = null;
try {
channel = socket.getChannel();
@@ -100,41 +110,66 @@ public class NIOSSLTransport extends NIOTransport {
// initialize engine, the initial sslSession we get will need to be
// updated once the ssl handshake process is completed.
- if (remoteHost != null && remotePort != -1) {
- sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
- } else {
- sslEngine = sslContext.createSSLEngine();
- }
+ if (!hasSslEngine) {
+ if (remoteHost != null && remotePort != -1) {
+ sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
+ } else {
+ sslEngine = sslContext.createSSLEngine();
+ }
- sslEngine.setUseClientMode(false);
- if (enabledCipherSuites != null) {
- sslEngine.setEnabledCipherSuites(enabledCipherSuites);
- }
+ sslEngine.setUseClientMode(false);
+ if (enabledCipherSuites != null) {
+ sslEngine.setEnabledCipherSuites(enabledCipherSuites);
+ }
- if (enabledProtocols != null) {
- sslEngine.setEnabledProtocols(enabledProtocols);
- }
+ if (enabledProtocols != null) {
+ sslEngine.setEnabledProtocols(enabledProtocols);
+ }
- if (wantClientAuth) {
- sslEngine.setWantClientAuth(wantClientAuth);
- }
+ if (wantClientAuth) {
+ sslEngine.setWantClientAuth(wantClientAuth);
+ }
- if (needClientAuth) {
- sslEngine.setNeedClientAuth(needClientAuth);
- }
+ if (needClientAuth) {
+ sslEngine.setNeedClientAuth(needClientAuth);
+ }
- sslSession = sslEngine.getSession();
+ sslSession = sslEngine.getSession();
- inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
- inputBuffer.clear();
+ inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+ inputBuffer.clear();
+ }
outputStream = new NIOOutputStream(channel);
outputStream.setEngine(sslEngine);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
- sslEngine.beginHandshake();
+
+ //If the sslEngine was not passed in, then handshake
+ if (!hasSslEngine)
+ sslEngine.beginHandshake();
handshakeStatus = sslEngine.getHandshakeStatus();
- doHandshake();
+ if (!hasSslEngine)
+ doHandshake();
+
+ // if (hasSslEngine) {
+ selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+ @Override
+ public void onSelect(SelectorSelection selection) {
+ serviceRead();
+ }
+
+ @Override
+ public void onError(SelectorSelection selection, Throwable error) {
+ if (error instanceof IOException) {
+ onException((IOException) error);
+ } else {
+ onException(IOExceptionSupport.create(error));
+ }
+ }
+ });
+ doInit();
+
} catch (Exception e) {
try {
if(outputStream != null) {
@@ -146,6 +181,24 @@ public class NIOSSLTransport extends NIOTransport {
}
}
+ protected void doInit() throws Exception {
+
+ }
+
+ protected void doOpenWireInit() throws Exception {
+ //Do this later to let wire format negotiation happen
+ if (initBuffer != null && this.wireFormat instanceof OpenWireFormat) {
+ initBuffer.buffer.flip();
+ if (initBuffer.buffer.hasRemaining()) {
+ nextFrameSize = -1;
+ receiveCounter += initBuffer.readSize;
+ processCommand(initBuffer.buffer);
+ processCommand(initBuffer.buffer);
+ initBuffer.buffer.clear();
+ }
+ }
+ }
+
protected void finishHandshake() throws Exception {
if (handshakeInProgress) {
handshakeInProgress = false;
@@ -176,12 +229,14 @@ public class NIOSSLTransport extends NIOTransport {
}
@Override
- protected void serviceRead() {
+ public void serviceRead() {
try {
if (handshakeInProgress) {
doHandshake();
}
+ doOpenWireInit();
+
ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
plain.position(plain.limit());
@@ -293,7 +348,7 @@ public class NIOSSLTransport extends NIOTransport {
doConsume(command);
nextFrameSize = -1;
currentBuffer = null;
- }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
index 26e59e4..405314f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
@@ -18,20 +18,25 @@
package org.apache.activemq.transport.nio;
import java.io.IOException;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
@@ -44,6 +49,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
protected SSLContext context;
+ @Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new NIOSSLTransportServer(context, this, location, serverSocketFactory);
}
@@ -64,6 +70,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
* Overriding to allow for proper configuration through reflection but
* delegate to get common configuration
*/
+ @Override
public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
if (transport instanceof SslTransport) {
SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class);
@@ -79,6 +86,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
/**
* Overriding to use SslTransports.
*/
+ @Override
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
@@ -98,6 +106,13 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
return new SslTransport(wf, (SSLSocketFactory) socketFactory, location, localLocation, false);
}
+ @Override
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+ SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
+ throws IOException {
+ return new NIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
+ }
+
/**
* Creates a new SSL SocketFactory. The given factory will use user-provided
* key and trust managers (if the user provided them).
@@ -105,6 +120,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
* @return Newly created (Ssl)SocketFactory.
* @throws IOException
*/
+ @Override
protected SocketFactory createSocketFactory() throws IOException {
if (SslContext.getCurrentSslContext() != null) {
SslContext ctx = SslContext.getCurrentSslContext();
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
index 06a310b..e07d82b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
@@ -44,7 +44,7 @@ public class NIOSSLTransportServer extends TcpTransportServer {
@Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
- NIOSSLTransport transport = new NIOSSLTransport(format, socket);
+ NIOSSLTransport transport = new NIOSSLTransport(format, socket, null, null, null);
if (context != null) {
transport.setSslContext(context);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 6f7a1af..81a2938 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -58,6 +58,16 @@ public class NIOTransport extends TcpTransport {
super(wireFormat, socket);
}
+ /**
+ * @param format
+ * @param socket
+ * @param initBuffer
+ * @throws IOException
+ */
+ public NIOTransport(WireFormat format, Socket socket, InitBuffer initBuffer) throws IOException {
+ super(format, socket, initBuffer);
+ }
+
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@@ -91,11 +101,15 @@ public class NIOTransport extends TcpTransport {
this.buffOut = outPutStream;
}
+ protected int readFromBuffer() throws IOException {
+ return channel.read(currentBuffer);
+ }
+
protected void serviceRead() {
try {
while (true) {
- int readSize = channel.read(currentBuffer);
+ int readSize = readFromBuffer();
if (readSize == -1) {
onException(new EOFException());
selection.close();
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
index 249e48b..fd3a1ea 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
@@ -32,38 +32,52 @@ import javax.net.SocketFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
public class NIOTransportFactory extends TcpTransportFactory {
+ @Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
return new TcpTransportServer(this, location, serverSocketFactory) {
+ @Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new NIOTransport(format, socket);
}
};
}
+ @Override
protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
return new NIOTransport(wf, socketFactory, location, localLocation);
}
+ @Override
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+ InitBuffer initBuffer) throws IOException {
+ return new NIOTransport(wireFormat, socket, initBuffer);
+ }
+
+ @Override
protected ServerSocketFactory createServerSocketFactory() {
return new ServerSocketFactory() {
+ @Override
public ServerSocket createServerSocket(int port) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
return serverSocketChannel.socket();
}
+ @Override
public ServerSocket createServerSocket(int port, int backlog) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
return serverSocketChannel.socket();
}
+ @Override
public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
@@ -72,26 +86,31 @@ public class NIOTransportFactory extends TcpTransportFactory {
};
}
+ @Override
protected SocketFactory createSocketFactory() throws IOException {
return new SocketFactory() {
+ @Override
public Socket createSocket() throws IOException {
SocketChannel channel = SocketChannel.open();
return channel.socket();
}
+ @Override
public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress(host, port));
return channel.socket();
}
+ @Override
public Socket createSocket(InetAddress address, int port) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.connect(new InetSocketAddress(address, port));
return channel.socket();
}
+ @Override
public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
SocketChannel channel = SocketChannel.open();
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
@@ -99,6 +118,7 @@ public class NIOTransportFactory extends TcpTransportFactory {
return channel.socket();
}
+ @Override
public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
SocketChannel channel = SocketChannel.open();
channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
index 24659f7..2b3953f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
@@ -29,7 +29,6 @@ import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.apache.activemq.command.ConnectionInfo;
-
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -46,7 +45,7 @@ import org.apache.activemq.wireformat.WireFormat;
public class SslTransport extends TcpTransport {
/**
* Connect to a remote node such as a Broker.
- *
+ *
* @param wireFormat The WireFormat to be used.
* @param socketFactory The socket factory to be used. Forcing SSLSockets
* for obvious reasons.
@@ -76,7 +75,7 @@ public class SslTransport extends TcpTransport {
/**
* Initialize from a ServerSocket. No access to needClientAuth is given
* since it is already set within the provided socket.
- *
+ *
* @param wireFormat The WireFormat to be used.
* @param socket The Socket to be used. Forcing SSL.
* @throws IOException If TcpTransport throws.
@@ -85,12 +84,18 @@ public class SslTransport extends TcpTransport {
super(wireFormat, socket);
}
+ public SslTransport(WireFormat format, SSLSocket socket,
+ InitBuffer initBuffer) throws IOException {
+ super(format, socket, initBuffer);
+ }
+
/**
* Overriding in order to add the client's certificates to ConnectionInfo
* Commmands.
- *
+ *
* @param command The Command coming in.
*/
+ @Override
public void doConsume(Object command) {
// The instanceof can be avoided, but that would require modifying the
// Command clas tree and that would require too much effort right
@@ -98,15 +103,15 @@ public class SslTransport extends TcpTransport {
if (command instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)command;
connectionInfo.setTransportContext(getPeerCertificates());
- }
+ }
super.doConsume(command);
}
-
+
/**
* @return peer certificate chain associated with the ssl socket
*/
public X509Certificate[] getPeerCertificates() {
-
+
SSLSocket sslSocket = (SSLSocket)this.socket;
SSLSession sslSession = sslSocket.getSession();
@@ -117,13 +122,14 @@ public class SslTransport extends TcpTransport {
} catch (SSLPeerUnverifiedException e) {
clientCertChain = null;
}
-
+
return clientCertChain;
}
/**
* @return pretty print of 'this'
*/
+ @Override
public String toString() {
return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
index bdb817d..6ee08b6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
@@ -22,8 +22,8 @@ import java.io.InputStream;
/**
* An optimized buffered input stream for Tcp
- *
- *
+ *
+ *
*/
public class TcpBufferedInputStream extends FilterInputStream {
private static final int DEFAULT_BUFFER_SIZE = 8192;
@@ -53,6 +53,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
}
}
+ @Override
public int read() throws IOException {
if (position >= count) {
fill();
@@ -81,6 +82,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
return cnt;
}
+ @Override
public int read(byte b[], int off, int len) throws IOException {
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
@@ -105,6 +107,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
}
}
+ @Override
public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
@@ -118,17 +121,34 @@ public class TcpBufferedInputStream extends FilterInputStream {
return skipped;
}
+ @Override
public int available() throws IOException {
return in.available() + (count - position);
}
+ @Override
public boolean markSupported() {
return false;
}
+ @Override
public void close() throws IOException {
if (in != null) {
in.close();
}
}
+
+ /**
+ * @param array
+ * @throws IOException
+ */
+ public void unread(byte[] array) throws IOException {
+ int avail = internalBuffer.length - position;
+ if (array.length > avail) {
+ throw new IOException("Buffer is full, can't unread");
+ }
+
+ System.arraycopy(array, position, internalBuffer, 0, array.length);
+ count += array.length;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 8f515a8..335cde7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -16,6 +16,27 @@ gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
*/
package org.apache.activemq.transport.tcp;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
import org.apache.activemq.Service;
import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -28,18 +49,6 @@ import org.apache.activemq.wireformat.WireFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.SocketFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.*;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* An implementation of the {@link Transport} interface using raw tcp/ip
*
@@ -62,6 +71,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
protected DataInputStream dataIn;
protected TimeStampStream buffOut = null;
+ protected final InitBuffer initBuffer;
+
/**
* The Traffic Class to be set on the socket.
*/
@@ -149,6 +160,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
}
this.remoteLocation = remoteLocation;
this.localLocation = localLocation;
+ this.initBuffer = null;
setDaemon(false);
}
@@ -160,16 +172,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
* @throws IOException
*/
public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
+ this(wireFormat, socket, null);
+ }
+
+ public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
this.wireFormat = wireFormat;
this.socket = socket;
this.remoteLocation = null;
this.localLocation = null;
+ this.initBuffer = initBuffer;
setDaemon(true);
}
/**
* A one way asynchronous send
*/
+ @Override
public void oneway(Object command) throws IOException {
checkStarted();
wireFormat.marshal(command, dataOut);
@@ -188,6 +206,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
/**
* reads packets from a Socket
*/
+ @Override
public void run() {
LOG.trace("TCP consumer thread for " + this + " starting");
this.runnerThread=Thread.currentThread();
@@ -536,6 +555,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
// need a async task for this
final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
taskRunnerFactory.execute(new Runnable() {
+ @Override
public void run() {
LOG.trace("Closing socket {}", socket);
try {
@@ -609,10 +629,16 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
super.fill();
}
};
+ //Unread the initBuffer that was used for protocol detection if it exists
+ //so the stream can start over
+ if (initBuffer != null) {
+ buffIn.unread(initBuffer.buffer.array());
+ }
this.dataIn = new DataInputStream(buffIn);
TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
this.dataOut = new DataOutputStream(outputStream);
this.buffOut = outputStream;
+
}
protected void closeStreams() throws IOException {
@@ -628,6 +654,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
this.socketOptions = new HashMap<String, Object>(socketOptions);
}
+ @Override
public String getRemoteAddress() {
if (socket != null) {
SocketAddress address = socket.getRemoteSocketAddress();
@@ -650,10 +677,24 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
return super.narrow(target);
}
+ @Override
public int getReceiveCounter() {
return receiveCounter;
}
+ public static class InitBuffer {
+ public final int readSize;
+ public final ByteBuffer buffer;
+
+ public InitBuffer(int readSize, ByteBuffer buffer) {
+ if (buffer == null) {
+ throw new IllegalArgumentException("Null buffer not allowed.");
+ }
+ this.readSize = readSize;
+ this.buffer = buffer;
+ }
+ }
+
/**
* @param sock The socket on which to set the Traffic Class.
* @return Whether or not the Traffic Class was set on the given socket.
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index 3d2fa44..ae555fd 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -17,14 +17,17 @@
package org.apache.activemq.transport.tcp;
import java.io.IOException;
+import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
+import javax.net.ssl.SSLEngine;
import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.openwire.OpenWireFormat;
@@ -33,6 +36,7 @@ import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
@@ -139,6 +143,15 @@ public class TcpTransportFactory extends TransportFactory {
return createTcpTransport(wf, socketFactory, location, localLocation);
}
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
+ throw new IOException("createTransport() method not implemented!");
+ }
+
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+ SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
+ throw new IOException("createTransport() method not implemented!");
+ }
+
/**
* Allows subclasses of TcpTransportFactory to provide a create custom
* TcpTransport instances.
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index a0778cd..c7fe00f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -44,6 +44,7 @@ import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.openwire.OpenWireFormatFactory;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.TransportServerThreadSupport;
import org.apache.activemq.transport.nio.SelectorManager;
@@ -474,7 +475,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
return (InetSocketAddress) serverSocket.getLocalSocketAddress();
}
- protected final void handleSocket(Socket socket) {
+ protected void handleSocket(Socket socket) {
+ doHandleSocket(socket);
+ }
+
+ final protected void doHandleSocket(Socket socket) {
boolean closeSocket = true;
try {
if (this.currentTransportCount.get() >= this.maximumConnections) {
@@ -483,6 +488,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
"maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)");
} else {
+ currentTransportCount.incrementAndGet();
+
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
@@ -496,22 +503,23 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
options.put("startLogging", Boolean.valueOf(startLogging));
options.putAll(transportOptions);
- WireFormat format = wireFormatFactory.createWireFormat();
- Transport transport = createTransport(socket, format);
+ TransportInfo transportInfo = configureTransport(this, socket);
closeSocket = false;
- if (transport instanceof ServiceSupport) {
- ((ServiceSupport) transport).addServiceListener(this);
+ if (transportInfo.transport instanceof ServiceSupport) {
+ ((ServiceSupport) transportInfo.transport).addServiceListener(this);
}
- Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
+ Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
+ transportInfo.transport, transportInfo.format, options);
getAcceptListener().onAccept(configuredTransport);
- currentTransportCount.incrementAndGet();
}
} catch (SocketTimeoutException ste) {
// expect this to happen
+ currentTransportCount.decrementAndGet();
} catch (Exception e) {
+ currentTransportCount.decrementAndGet();
if (closeSocket) {
try {
socket.close();
@@ -528,6 +536,24 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
}
+ protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
+ WireFormat format = wireFormatFactory.createWireFormat();
+ Transport transport = createTransport(socket, format);
+ return new TransportInfo(format, transport, transportFactory);
+ }
+
+ protected class TransportInfo {
+ final WireFormat format;
+ final Transport transport;
+ final TransportFactory transportFactory;
+
+ public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
+ this.format = format;
+ this.transport = transport;
+ this.transportFactory = transportFactory;
+ }
+ }
+
public int getSoTimeout() {
return soTimeout;
}
@@ -567,6 +593,10 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
this.maximumConnections = maximumConnections;
}
+ public AtomicInteger getCurrentTransportCount() {
+ return currentTransportCount;
+ }
+
@Override
public void started(Service service) {
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-http/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index ef1f823..7d00276 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -220,6 +220,26 @@
</plugins>
</build>
</profile>
-
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-jaas/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jaas/pom.xml b/activemq-jaas/pom.xml
index 48f2fb1..ce4ba07 100644
--- a/activemq-jaas/pom.xml
+++ b/activemq-jaas/pom.xml
@@ -148,5 +148,26 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-jms-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/pom.xml b/activemq-jms-pool/pom.xml
index 9f2c8b7..ca4321d 100755
--- a/activemq-jms-pool/pom.xml
+++ b/activemq-jms-pool/pom.xml
@@ -132,6 +132,27 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-kahadb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml
index d647725..80787ee 100755
--- a/activemq-kahadb-store/pom.xml
+++ b/activemq-kahadb-store/pom.xml
@@ -251,7 +251,28 @@
</plugins>
</build>
</profile>
-
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+
<profile>
<id>activemq.tests.aix.excludes</id>
<activation>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-karaf-itest/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/pom.xml b/activemq-karaf-itest/pom.xml
index 063e6e8..5044fcb 100644
--- a/activemq-karaf-itest/pom.xml
+++ b/activemq-karaf-itest/pom.xml
@@ -267,6 +267,27 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-leveldb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/pom.xml b/activemq-leveldb-store/pom.xml
index d39679d..525e413 100644
--- a/activemq-leveldb-store/pom.xml
+++ b/activemq-leveldb-store/pom.xml
@@ -512,6 +512,27 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>activemq.tests.windows.excludes</id>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 3aa1f70..36384e0 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -289,6 +289,27 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/auto/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
<repositories>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
index d82fcb5..0d3a838 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import javax.net.SocketFactory;
+import javax.net.ssl.SSLEngine;
import org.apache.activemq.transport.nio.NIOSSLTransport;
import org.apache.activemq.wireformat.WireFormat;
@@ -37,7 +38,12 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
}
public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
- super(wireFormat, socket);
+ super(wireFormat, socket, null, null, null);
+ }
+
+ public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket,
+ SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
+ super(wireFormat, socket, engine, initBuffer, inputBuffer);
}
@Override
@@ -56,4 +62,19 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
codec.parse(dis, fill.length);
}
+
+ /* (non-Javadoc)
+ * @see org.apache.activemq.transport.nio.NIOSSLTransport#doInit()
+ */
+ @Override
+ protected void doInit() throws Exception {
+ if (initBuffer != null) {
+ nextFrameSize = -1;
+ receiveCounter += initBuffer.readSize;
+ initBuffer.buffer.flip();
+ processCommand(initBuffer.buffer);
+ }
+ }
+
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
index f13b537..b3a74bc 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
@@ -21,14 +21,18 @@ import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
import javax.net.ServerSocketFactory;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
import org.apache.activemq.broker.SslContext;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.wireformat.WireFormat;
@@ -39,6 +43,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
@Override
protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
+ @Override
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
if (context != null) {
@@ -57,6 +62,13 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
}
@Override
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+ SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
+ throws IOException {
+ return new MQTTNIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
+ }
+
+ @Override
public TransportServer doBind(URI location) throws IOException {
if (SslContext.getCurrentSslContext() != null) {
try {
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
index e750366..ef33b20 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
@@ -56,6 +56,10 @@ public class MQTTNIOTransport extends TcpTransport {
super(wireFormat, socket);
}
+ public MQTTNIOTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
+ super(wireFormat, socket, initBuffer);
+ }
+
@Override
protected void initializeStreams() throws IOException {
channel = socket.getChannel();
@@ -84,6 +88,16 @@ public class MQTTNIOTransport extends TcpTransport {
dataOut = new DataOutputStream(outPutStream);
buffOut = outPutStream;
codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
+
+ try {
+ if (initBuffer != null) {
+ processBuffer(initBuffer.buffer, initBuffer.readSize);
+ }
+ } catch (IOException e) {
+ onException(e);
+ } catch (Throwable e) {
+ onException(IOExceptionSupport.create(e));
+ }
}
private void serviceRead() {
@@ -103,14 +117,7 @@ public class MQTTNIOTransport extends TcpTransport {
break;
}
- inputBuffer.flip();
- DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
- codec.parse(dis, readSize);
-
- receiveCounter += readSize;
-
- // clear the buffer
- inputBuffer.clear();
+ processBuffer(inputBuffer, readSize);
}
} catch (IOException e) {
onException(e);
@@ -119,6 +126,17 @@ public class MQTTNIOTransport extends TcpTransport {
}
}
+ protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
+ buffer.flip();
+ DataByteArrayInputStream dis = new DataByteArrayInputStream(buffer.array());
+ codec.parse(dis, readSize);
+
+ receiveCounter += readSize;
+
+ // clear the buffer
+ buffer.clear();
+ }
+
@Override
protected void doStart() throws Exception {
connect();
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index efc85be..5823add 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.nio.NIOTransportFactory;
import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.wireformat.WireFormat;
@@ -66,6 +67,12 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
}
+ @Override
+ public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+ InitBuffer initBuffer) throws IOException {
+ return new MQTTNIOTransport(wireFormat, socket, initBuffer);
+ }
+
@SuppressWarnings("rawtypes")
@Override
public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
new file mode 100644
index 0000000..e777385
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoNioSslTest extends MQTTTest {
+
+ @Override
+ public String getProtocolScheme() {
+ return "auto+nio+ssl";
+ }
+
+ @Override
+ public boolean isUseSSL() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
new file mode 100644
index 0000000..f7023a3
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoNioTest extends MQTTTest {
+
+ @Override
+ public String getProtocolScheme() {
+ return "auto+nio";
+ }
+
+ @Override
+ public boolean isUseSSL() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
new file mode 100644
index 0000000..e31f494
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoSslTest extends MQTTTest {
+
+ @Override
+ public String getProtocolScheme() {
+ return "auto+ssl";
+ }
+
+ @Override
+ public boolean isUseSSL() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
new file mode 100644
index 0000000..7471f6e
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoTest extends MQTTTest {
+
+ @Override
+ public String getProtocolScheme() {
+ return "auto";
+ }
+
+ @Override
+ public boolean isUseSSL() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-partition/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml
index fa2bc8f..3df537e 100644
--- a/activemq-partition/pom.xml
+++ b/activemq-partition/pom.xml
@@ -130,6 +130,27 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml
index 2b9d751..d94dde4 100755
--- a/activemq-pool/pom.xml
+++ b/activemq-pool/pom.xml
@@ -104,5 +104,29 @@
<scope>test</scope>
</dependency>
</dependencies>
+
+ <profiles>
+ <profile>
+ <id>activemq.tests-autoTransport</id>
+ <activation>
+ <property>
+ <name>activemq.tests</name>
+ <value>autoTransport</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes>
+ <exclude>**</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
</project>