You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/08/11 22:03:13 UTC

[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5889

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
new file mode 100644
index 0000000..e218180
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/OpenWireProtocolVerifier.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.openwire.OpenWireFormatFactory;
+
+/**
+ *
+ *
+ */
+public class OpenWireProtocolVerifier  implements ProtocolVerifier {
+
+    protected final OpenWireFormatFactory wireFormatFactory;
+
+    public OpenWireProtocolVerifier(OpenWireFormatFactory wireFormatFactory) {
+        this.wireFormatFactory = wireFormatFactory;
+    }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        if (value.length < 8) {
+           throw new IllegalArgumentException("Protocol header length changed "
+                                                 + value.length);
+        }
+
+        int start = !((OpenWireFormat)wireFormatFactory.createWireFormat()).isSizePrefixDisabled() ? 4 : 0;
+        int j = 0;
+        // type
+        if (value[start] != WireFormatInfo.DATA_STRUCTURE_TYPE) {
+           return false;
+        }
+        start++;
+        WireFormatInfo info = new WireFormatInfo();
+        final byte[] magic = info.getMagic();
+        int remainingLen = value.length - start;
+        int useLen = remainingLen > magic.length ? magic.length : remainingLen;
+        useLen += start;
+        // magic
+        for (int i = start; i < useLen; i++) {
+           if (value[i] != magic[j]) {
+              return false;
+           }
+           j++;
+        }
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
new file mode 100644
index 0000000..c5755c7
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/ProtocolVerifier.java
@@ -0,0 +1,9 @@
+package org.apache.activemq.broker.transport.protocol;
+
+
+public interface ProtocolVerifier {
+
+    public boolean isProtocol(byte[] value);
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
new file mode 100644
index 0000000..f18f1c7
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/transport/protocol/StompProtocolVerifier.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.transport.protocol;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ *
+ *
+ */
+public class StompProtocolVerifier implements ProtocolVerifier {
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.broker.transport.protocol.ProtocolVerifier#isProtocol(byte[])
+     */
+    @Override
+    public boolean isProtocol(byte[] value) {
+        String frameStart = new String(value, StandardCharsets.US_ASCII);
+        return frameStart.startsWith("CONNECT") || frameStart.startsWith("STOMP");
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
new file mode 100644
index 0000000..f922e98
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/nio/AutoInitNioSSLTransport.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.transport.nio;
+
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.util.ServiceStopper;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This transport initializes the SSLEngine and reads the first command before
+ * handing off to the detected transport.
+ *
+ */
+public class AutoInitNioSSLTransport extends NIOSSLTransport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AutoInitNioSSLTransport.class);
+
+    public AutoInitNioSSLTransport(WireFormat wireFormat, SocketFactory socketFactory, URI remoteLocation, URI localLocation) throws UnknownHostException, IOException {
+        super(wireFormat, socketFactory, remoteLocation, localLocation);
+    }
+
+    public AutoInitNioSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        super(wireFormat, socket, null, null, null);
+    }
+
+    @Override
+    public void setSslContext(SSLContext sslContext) {
+        this.sslContext = sslContext;
+    }
+
+    public ByteBuffer getInputBuffer() {
+        return this.inputBuffer;
+    }
+
+    @Override
+    protected void initializeStreams() throws IOException {
+        NIOOutputStream outputStream = null;
+        try {
+            channel = socket.getChannel();
+            channel.configureBlocking(false);
+
+            if (sslContext == null) {
+                sslContext = SSLContext.getDefault();
+            }
+
+            String remoteHost = null;
+            int remotePort = -1;
+
+            try {
+                URI remoteAddress = new URI(this.getRemoteAddress());
+                remoteHost = remoteAddress.getHost();
+                remotePort = remoteAddress.getPort();
+            } catch (Exception e) {
+            }
+
+            // initialize engine, the initial sslSession we get will need to be
+            // updated once the ssl handshake process is completed.
+            if (remoteHost != null && remotePort != -1) {
+                sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
+            } else {
+                sslEngine = sslContext.createSSLEngine();
+            }
+
+            sslEngine.setUseClientMode(false);
+            if (enabledCipherSuites != null) {
+                sslEngine.setEnabledCipherSuites(enabledCipherSuites);
+            }
+
+            if (enabledProtocols != null) {
+                sslEngine.setEnabledProtocols(enabledProtocols);
+            }
+
+            if (wantClientAuth) {
+                sslEngine.setWantClientAuth(wantClientAuth);
+            }
+
+            if (needClientAuth) {
+                sslEngine.setNeedClientAuth(needClientAuth);
+            }
+
+            sslSession = sslEngine.getSession();
+
+            inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+            inputBuffer.clear();
+
+            outputStream = new NIOOutputStream(channel);
+            outputStream.setEngine(sslEngine);
+            this.dataOut = new DataOutputStream(outputStream);
+            this.buffOut = outputStream;
+            sslEngine.beginHandshake();
+            handshakeStatus = sslEngine.getHandshakeStatus();
+            doHandshake();
+           // detectReadyState();
+        } catch (Exception e) {
+            try {
+                if(outputStream != null) {
+                    outputStream.close();
+                }
+                super.closeStreams();
+            } catch (Exception ex) {}
+            throw new IOException(e);
+        }
+    }
+
+    @Override
+    protected void doOpenWireInit() throws Exception {
+
+    }
+
+
+    @Override
+    protected void finishHandshake() throws Exception {
+        if (handshakeInProgress) {
+            handshakeInProgress = false;
+            nextFrameSize = -1;
+
+            // Once handshake completes we need to ask for the now real sslSession
+            // otherwise the session would return 'SSL_NULL_WITH_NULL_NULL' for the
+            // cipher suite.
+            sslSession = sslEngine.getSession();
+
+        }
+    }
+
+    public SSLEngine getSslSession() {
+        return this.sslEngine;
+    }
+
+    public volatile byte[] read;
+    public volatile int readSize;
+
+    @Override
+    public void serviceRead() {
+        try {
+            if (handshakeInProgress) {
+                doHandshake();
+            }
+
+            ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
+            plain.position(plain.limit());
+
+            while (true) {
+                if (!plain.hasRemaining()) {
+
+                    int readCount = secureRead(plain);
+
+                    if (readCount == 0) {
+                        break;
+                    }
+
+                    // channel is closed, cleanup
+                    if (readCount == -1) {
+                        onException(new EOFException());
+                        selection.close();
+                        break;
+                    }
+
+                    receiveCounter += readCount;
+                }
+
+                if (status == SSLEngineResult.Status.OK && handshakeStatus != SSLEngineResult.HandshakeStatus.NEED_UNWRAP) {
+                    processCommand(plain);
+                    //Break when command is found
+                    break;
+                }
+            }
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
+    }
+
+    @Override
+    protected void processCommand(ByteBuffer plain) throws Exception {
+        read = plain.array();
+        readSize = receiveCounter;
+    }
+
+
+    @Override
+    public void doStart() throws Exception {
+        taskRunnerFactory = new TaskRunnerFactory("ActiveMQ NIOSSLTransport Task");
+        // no need to init as we can delay that until demand (eg in doHandshake)
+        connect();
+        //super.doStart();
+    }
+
+
+    @Override
+    protected void doStop(ServiceStopper stopper) throws Exception {
+        if (taskRunnerFactory != null) {
+            taskRunnerFactory.shutdownNow();
+            taskRunnerFactory = null;
+        }
+//        if (selection != null) {
+//            selection.close();
+//            selection = null;
+//        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
new file mode 100644
index 0000000..d6791eb
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.AutoTcpTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
new file mode 100644
index 0000000..a03cc27
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.nio.AutoNioTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
new file mode 100644
index 0000000..74a08c1
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+nio+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.nio.AutoNioSslTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
new file mode 100644
index 0000000..d6e626e
--- /dev/null
+++ b/activemq-broker/src/main/resources/META-INF/services/org/apache/activemq/transport/auto+ssl
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.activemq.broker.transport.auto.AutoSslTransportFactory
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-camel/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-camel/pom.xml b/activemq-camel/pom.xml
index 1f19539..11f9610 100755
--- a/activemq-camel/pom.xml
+++ b/activemq-camel/pom.xml
@@ -254,7 +254,27 @@
         </plugins>
       </build>
     </profile>
-
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml
index 3fb3ddd..81531cc 100755
--- a/activemq-client/pom.xml
+++ b/activemq-client/pom.xml
@@ -345,5 +345,26 @@
         </plugins>
       </build>
     </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
index a25737f..10eebd3 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq;
 
+import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.AccessController;
@@ -314,7 +315,22 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
      */
     protected Transport createTransport() throws JMSException {
         try {
-            return TransportFactory.connect(brokerURL);
+            URI connectBrokerUL = brokerURL;
+            String scheme = brokerURL.getScheme();
+            if (scheme == null) {
+                throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
+            }
+            if (scheme.equals("auto")) {
+                connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
+            } else if (scheme.equals("auto+ssl")) {
+                connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
+            } else if (scheme.equals("auto+nio")) {
+                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
+            } else if (scheme.equals("auto+nio+ssl")) {
+                connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
+            }
+
+            return TransportFactory.connect(connectBrokerUL);
         } catch (Exception e) {
             throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
index 38b1ae3..327ea42 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/TransportFactory.java
@@ -18,15 +18,19 @@ package org.apache.activemq.transport;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 
+import javax.net.ssl.SSLEngine;
+
 import org.apache.activemq.util.FactoryFinder;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -119,6 +123,9 @@ public abstract class TransportFactory {
             WireFormat wf = createWireFormat(options);
             Transport transport = createTransport(location, wf);
             Transport rc = configure(transport, wf, options);
+            //remove auto
+            IntrospectionSupport.extractProperties(options, "auto.");
+
             if (!options.isEmpty()) {
                 throw new IllegalArgumentException("Invalid connect parameters: " + options);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
index 6b8a446..eeb68d8 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransport.java
@@ -69,16 +69,26 @@ public class NIOSSLTransport extends NIOTransport {
         super(wireFormat, socketFactory, remoteLocation, localLocation);
     }
 
-    public NIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
-        super(wireFormat, socket);
+    public NIOSSLTransport(WireFormat wireFormat, Socket socket, SSLEngine engine, InitBuffer initBuffer,
+            ByteBuffer inputBuffer) throws IOException {
+        super(wireFormat, socket, initBuffer);
+        this.sslEngine = engine;
+        if (engine != null)
+            this.sslSession = engine.getSession();
+        this.inputBuffer = inputBuffer;
     }
 
     public void setSslContext(SSLContext sslContext) {
         this.sslContext = sslContext;
     }
 
+    volatile boolean hasSslEngine = false;
+
     @Override
     protected void initializeStreams() throws IOException {
+        if (sslEngine != null) {
+            hasSslEngine = true;
+        }
         NIOOutputStream outputStream = null;
         try {
             channel = socket.getChannel();
@@ -100,41 +110,66 @@ public class NIOSSLTransport extends NIOTransport {
 
             // initialize engine, the initial sslSession we get will need to be
             // updated once the ssl handshake process is completed.
-            if (remoteHost != null && remotePort != -1) {
-                sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
-            } else {
-                sslEngine = sslContext.createSSLEngine();
-            }
+            if (!hasSslEngine) {
+                if (remoteHost != null && remotePort != -1) {
+                    sslEngine = sslContext.createSSLEngine(remoteHost, remotePort);
+                } else {
+                    sslEngine = sslContext.createSSLEngine();
+                }
 
-            sslEngine.setUseClientMode(false);
-            if (enabledCipherSuites != null) {
-                sslEngine.setEnabledCipherSuites(enabledCipherSuites);
-            }
+                sslEngine.setUseClientMode(false);
+                if (enabledCipherSuites != null) {
+                    sslEngine.setEnabledCipherSuites(enabledCipherSuites);
+                }
 
-            if (enabledProtocols != null) {
-                sslEngine.setEnabledProtocols(enabledProtocols);
-            }
+                if (enabledProtocols != null) {
+                    sslEngine.setEnabledProtocols(enabledProtocols);
+                }
 
-            if (wantClientAuth) {
-                sslEngine.setWantClientAuth(wantClientAuth);
-            }
+                if (wantClientAuth) {
+                    sslEngine.setWantClientAuth(wantClientAuth);
+                }
 
-            if (needClientAuth) {
-                sslEngine.setNeedClientAuth(needClientAuth);
-            }
+                if (needClientAuth) {
+                    sslEngine.setNeedClientAuth(needClientAuth);
+                }
 
-            sslSession = sslEngine.getSession();
+                sslSession = sslEngine.getSession();
 
-            inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
-            inputBuffer.clear();
+                inputBuffer = ByteBuffer.allocate(sslSession.getPacketBufferSize());
+                inputBuffer.clear();
+            }
 
             outputStream = new NIOOutputStream(channel);
             outputStream.setEngine(sslEngine);
             this.dataOut = new DataOutputStream(outputStream);
             this.buffOut = outputStream;
-            sslEngine.beginHandshake();
+
+            //If the sslEngine was not passed in, then handshake
+            if (!hasSslEngine)
+                sslEngine.beginHandshake();
             handshakeStatus = sslEngine.getHandshakeStatus();
-            doHandshake();
+            if (!hasSslEngine)
+                doHandshake();
+
+           // if (hasSslEngine) {
+            selection = SelectorManager.getInstance().register(channel, new SelectorManager.Listener() {
+                @Override
+                public void onSelect(SelectorSelection selection) {
+                    serviceRead();
+                }
+
+                @Override
+                public void onError(SelectorSelection selection, Throwable error) {
+                    if (error instanceof IOException) {
+                        onException((IOException) error);
+                    } else {
+                        onException(IOExceptionSupport.create(error));
+                    }
+                }
+            });
+            doInit();
+
         } catch (Exception e) {
             try {
                 if(outputStream != null) {
@@ -146,6 +181,24 @@ public class NIOSSLTransport extends NIOTransport {
         }
     }
 
+    protected void doInit() throws Exception {
+
+    }
+
+    protected void doOpenWireInit() throws Exception {
+        //Do this later to let wire format negotiation happen
+        if (initBuffer != null && this.wireFormat instanceof OpenWireFormat) {
+            initBuffer.buffer.flip();
+            if (initBuffer.buffer.hasRemaining()) {
+                nextFrameSize = -1;
+                receiveCounter += initBuffer.readSize;
+                processCommand(initBuffer.buffer);
+                processCommand(initBuffer.buffer);
+                initBuffer.buffer.clear();
+            }
+        }
+    }
+
     protected void finishHandshake() throws Exception {
         if (handshakeInProgress) {
             handshakeInProgress = false;
@@ -176,12 +229,14 @@ public class NIOSSLTransport extends NIOTransport {
     }
 
     @Override
-    protected void serviceRead() {
+    public void serviceRead() {
         try {
             if (handshakeInProgress) {
                 doHandshake();
             }
 
+            doOpenWireInit();
+
             ByteBuffer plain = ByteBuffer.allocate(sslSession.getApplicationBufferSize());
             plain.position(plain.limit());
 
@@ -293,7 +348,7 @@ public class NIOSSLTransport extends NIOTransport {
                 doConsume(command);
                 nextFrameSize = -1;
                 currentBuffer = null;
-            }
+           }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
index 26e59e4..405314f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportFactory.java
@@ -18,20 +18,25 @@
 package org.apache.activemq.transport.nio;
 
 import java.io.IOException;
+import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.Map;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLSocketFactory;
 
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.tcp.SslTransport;
+import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -44,6 +49,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
 
     protected SSLContext context;
 
+    @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new NIOSSLTransportServer(context, this, location, serverSocketFactory);
     }
@@ -64,6 +70,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
      * Overriding to allow for proper configuration through reflection but
      * delegate to get common configuration
      */
+    @Override
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
         if (transport instanceof SslTransport) {
             SslTransport sslTransport = (SslTransport) transport.narrow(SslTransport.class);
@@ -79,6 +86,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
     /**
      * Overriding to use SslTransports.
      */
+    @Override
     protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
 
         URI localLocation = null;
@@ -98,6 +106,13 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
         return new SslTransport(wf, (SSLSocketFactory) socketFactory, location, localLocation, false);
     }
 
+    @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
+            throws IOException {
+        return new NIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
+    }
+
     /**
      * Creates a new SSL SocketFactory. The given factory will use user-provided
      * key and trust managers (if the user provided them).
@@ -105,6 +120,7 @@ public class NIOSSLTransportFactory extends NIOTransportFactory {
      * @return Newly created (Ssl)SocketFactory.
      * @throws IOException
      */
+    @Override
     protected SocketFactory createSocketFactory() throws IOException {
         if (SslContext.getCurrentSslContext() != null) {
             SslContext ctx = SslContext.getCurrentSslContext();

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
index 06a310b..e07d82b 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOSSLTransportServer.java
@@ -44,7 +44,7 @@ public class NIOSSLTransportServer extends TcpTransportServer {
 
     @Override
     protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
-        NIOSSLTransport transport = new NIOSSLTransport(format, socket);
+        NIOSSLTransport transport = new NIOSSLTransport(format, socket, null, null, null);
         if (context != null) {
             transport.setSslContext(context);
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
index 6f7a1af..81a2938 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransport.java
@@ -58,6 +58,16 @@ public class NIOTransport extends TcpTransport {
         super(wireFormat, socket);
     }
 
+    /**
+     * @param format
+     * @param socket
+     * @param initBuffer
+     * @throws IOException
+     */
+    public NIOTransport(WireFormat format, Socket socket, InitBuffer initBuffer) throws IOException {
+        super(format, socket, initBuffer);
+    }
+
     @Override
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
@@ -91,11 +101,15 @@ public class NIOTransport extends TcpTransport {
         this.buffOut = outPutStream;
     }
 
+    protected int readFromBuffer() throws IOException {
+        return channel.read(currentBuffer);
+    }
+
     protected void serviceRead() {
         try {
             while (true) {
 
-                int readSize = channel.read(currentBuffer);
+                int readSize = readFromBuffer();
                 if (readSize == -1) {
                     onException(new EOFException());
                     selection.close();

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
index 249e48b..fd3a1ea 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOTransportFactory.java
@@ -32,38 +32,52 @@ import javax.net.SocketFactory;
 
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.wireformat.WireFormat;
 
 public class NIOTransportFactory extends TcpTransportFactory {
 
+    @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         return new TcpTransportServer(this, location, serverSocketFactory) {
+            @Override
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 return new NIOTransport(format, socket);
             }
         };
     }
 
+    @Override
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
         return new NIOTransport(wf, socketFactory, location, localLocation);
     }
 
+    @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            InitBuffer initBuffer) throws IOException {
+       return new NIOTransport(wireFormat, socket, initBuffer);
+    }
+
+    @Override
     protected ServerSocketFactory createServerSocketFactory() {
         return new ServerSocketFactory() {
+            @Override
             public ServerSocket createServerSocket(int port) throws IOException {
                 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                 serverSocketChannel.socket().bind(new InetSocketAddress(port));
                 return serverSocketChannel.socket();
             }
 
+            @Override
             public ServerSocket createServerSocket(int port, int backlog) throws IOException {
                 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                 serverSocketChannel.socket().bind(new InetSocketAddress(port), backlog);
                 return serverSocketChannel.socket();
             }
 
+            @Override
             public ServerSocket createServerSocket(int port, int backlog, InetAddress ifAddress) throws IOException {
                 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                 serverSocketChannel.socket().bind(new InetSocketAddress(ifAddress, port), backlog);
@@ -72,26 +86,31 @@ public class NIOTransportFactory extends TcpTransportFactory {
         };
     }
 
+    @Override
     protected SocketFactory createSocketFactory() throws IOException {
         return new SocketFactory() {
 
+            @Override
             public Socket createSocket() throws IOException {
                 SocketChannel channel = SocketChannel.open();
                 return channel.socket();
             }
 
+            @Override
             public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
                 SocketChannel channel = SocketChannel.open();
                 channel.connect(new InetSocketAddress(host, port));
                 return channel.socket();
             }
 
+            @Override
             public Socket createSocket(InetAddress address, int port) throws IOException {
                 SocketChannel channel = SocketChannel.open();
                 channel.connect(new InetSocketAddress(address, port));
                 return channel.socket();
             }
 
+            @Override
             public Socket createSocket(String address, int port, InetAddress localAddresss, int localPort) throws IOException, UnknownHostException {
                 SocketChannel channel = SocketChannel.open();
                 channel.socket().bind(new InetSocketAddress(localAddresss, localPort));
@@ -99,6 +118,7 @@ public class NIOTransportFactory extends TcpTransportFactory {
                 return channel.socket();
             }
 
+            @Override
             public Socket createSocket(InetAddress address, int port, InetAddress localAddresss, int localPort) throws IOException {
                 SocketChannel channel = SocketChannel.open();
                 channel.socket().bind(new InetSocketAddress(localAddresss, localPort));

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
index 24659f7..2b3953f 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/SslTransport.java
@@ -29,7 +29,6 @@ import javax.net.ssl.SSLSocket;
 import javax.net.ssl.SSLSocketFactory;
 
 import org.apache.activemq.command.ConnectionInfo;
-
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -46,7 +45,7 @@ import org.apache.activemq.wireformat.WireFormat;
 public class SslTransport extends TcpTransport {
     /**
      * Connect to a remote node such as a Broker.
-     * 
+     *
      * @param wireFormat The WireFormat to be used.
      * @param socketFactory The socket factory to be used. Forcing SSLSockets
      *                for obvious reasons.
@@ -76,7 +75,7 @@ public class SslTransport extends TcpTransport {
     /**
      * Initialize from a ServerSocket. No access to needClientAuth is given
      * since it is already set within the provided socket.
-     * 
+     *
      * @param wireFormat The WireFormat to be used.
      * @param socket The Socket to be used. Forcing SSL.
      * @throws IOException If TcpTransport throws.
@@ -85,12 +84,18 @@ public class SslTransport extends TcpTransport {
         super(wireFormat, socket);
     }
 
+    public SslTransport(WireFormat format, SSLSocket socket,
+            InitBuffer initBuffer) throws IOException {
+        super(format, socket, initBuffer);
+    }
+
     /**
      * Overriding in order to add the client's certificates to ConnectionInfo
      * Commmands.
-     * 
+     *
      * @param command The Command coming in.
      */
+    @Override
     public void doConsume(Object command) {
         // The instanceof can be avoided, but that would require modifying the
         // Command clas tree and that would require too much effort right
@@ -98,15 +103,15 @@ public class SslTransport extends TcpTransport {
         if (command instanceof ConnectionInfo) {
             ConnectionInfo connectionInfo = (ConnectionInfo)command;
             connectionInfo.setTransportContext(getPeerCertificates());
-        } 
+        }
         super.doConsume(command);
     }
-    
+
     /**
      * @return peer certificate chain associated with the ssl socket
      */
     public X509Certificate[] getPeerCertificates() {
-    	
+
         SSLSocket sslSocket = (SSLSocket)this.socket;
 
         SSLSession sslSession = sslSocket.getSession();
@@ -117,13 +122,14 @@ public class SslTransport extends TcpTransport {
         } catch (SSLPeerUnverifiedException e) {
         	clientCertChain = null;
         }
-    	
+
         return clientCertChain;
     }
 
     /**
      * @return pretty print of 'this'
      */
+    @Override
     public String toString() {
         return "ssl://" + socket.getInetAddress() + ":" + socket.getPort();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
index bdb817d..6ee08b6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpBufferedInputStream.java
@@ -22,8 +22,8 @@ import java.io.InputStream;
 
 /**
  * An optimized buffered input stream for Tcp
- * 
- * 
+ *
+ *
  */
 public class TcpBufferedInputStream extends FilterInputStream {
     private static final int DEFAULT_BUFFER_SIZE = 8192;
@@ -53,6 +53,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
         }
     }
 
+    @Override
     public int read() throws IOException {
         if (position >= count) {
             fill();
@@ -81,6 +82,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
         return cnt;
     }
 
+    @Override
     public int read(byte b[], int off, int len) throws IOException {
         if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
             throw new IndexOutOfBoundsException();
@@ -105,6 +107,7 @@ public class TcpBufferedInputStream extends FilterInputStream {
         }
     }
 
+    @Override
     public long skip(long n) throws IOException {
         if (n <= 0) {
             return 0;
@@ -118,17 +121,34 @@ public class TcpBufferedInputStream extends FilterInputStream {
         return skipped;
     }
 
+    @Override
     public int available() throws IOException {
         return in.available() + (count - position);
     }
 
+    @Override
     public boolean markSupported() {
         return false;
     }
 
+    @Override
     public void close() throws IOException {
         if (in != null) {
             in.close();
         }
     }
+
+    /**
+     * @param array
+     * @throws IOException
+     */
+    public void unread(byte[] array) throws IOException {
+        int avail = internalBuffer.length - position;
+        if (array.length > avail) {
+            throw new IOException("Buffer is full, can't unread");
+        }
+
+        System.arraycopy(array, position, internalBuffer, 0, array.length);
+        count += array.length;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
index 8f515a8..335cde7 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
@@ -16,6 +16,27 @@ gxfdgvdfg * Licensed to the Apache Software Foundation (ASF) under one or more
  */
 package org.apache.activemq.transport.tcp;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.net.SocketFactory;
+
 import org.apache.activemq.Service;
 import org.apache.activemq.TransportLoggerSupport;
 import org.apache.activemq.thread.TaskRunnerFactory;
@@ -28,18 +49,6 @@ import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.SocketFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.*;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
  *
@@ -62,6 +71,8 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
     protected DataInputStream dataIn;
     protected TimeStampStream buffOut = null;
 
+    protected final InitBuffer initBuffer;
+
     /**
      * The Traffic Class to be set on the socket.
      */
@@ -149,6 +160,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
         }
         this.remoteLocation = remoteLocation;
         this.localLocation = localLocation;
+        this.initBuffer = null;
         setDaemon(false);
     }
 
@@ -160,16 +172,22 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
      * @throws IOException
      */
     public TcpTransport(WireFormat wireFormat, Socket socket) throws IOException {
+        this(wireFormat, socket, null);
+    }
+
+    public TcpTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
         this.wireFormat = wireFormat;
         this.socket = socket;
         this.remoteLocation = null;
         this.localLocation = null;
+        this.initBuffer = initBuffer;
         setDaemon(true);
     }
 
     /**
      * A one way asynchronous send
      */
+    @Override
     public void oneway(Object command) throws IOException {
         checkStarted();
         wireFormat.marshal(command, dataOut);
@@ -188,6 +206,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
     /**
      * reads packets from a Socket
      */
+    @Override
     public void run() {
         LOG.trace("TCP consumer thread for " + this + " starting");
         this.runnerThread=Thread.currentThread();
@@ -536,6 +555,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
                 // need a async task for this
                 final TaskRunnerFactory taskRunnerFactory = new TaskRunnerFactory();
                 taskRunnerFactory.execute(new Runnable() {
+                    @Override
                     public void run() {
                         LOG.trace("Closing socket {}", socket);
                         try {
@@ -609,10 +629,16 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
                 super.fill();
             }
         };
+        //Unread the initBuffer that was used for protocol detection if it exists
+        //so the stream can start over
+        if (initBuffer != null) {
+            buffIn.unread(initBuffer.buffer.array());
+        }
         this.dataIn = new DataInputStream(buffIn);
         TcpBufferedOutputStream outputStream = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
         this.dataOut = new DataOutputStream(outputStream);
         this.buffOut = outputStream;
+
     }
 
     protected void closeStreams() throws IOException {
@@ -628,6 +654,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
         this.socketOptions = new HashMap<String, Object>(socketOptions);
     }
 
+    @Override
     public String getRemoteAddress() {
         if (socket != null) {
             SocketAddress address = socket.getRemoteSocketAddress();
@@ -650,10 +677,24 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
         return super.narrow(target);
     }
 
+    @Override
     public int getReceiveCounter() {
         return receiveCounter;
     }
 
+    public static class InitBuffer {
+        public final int readSize;
+        public final ByteBuffer buffer;
+
+        public InitBuffer(int readSize, ByteBuffer buffer) {
+            if (buffer == null) {
+                throw new IllegalArgumentException("Null buffer not allowed.");
+            }
+            this.readSize = readSize;
+            this.buffer = buffer;
+        }
+    }
+
     /**
      * @param sock The socket on which to set the Traffic Class.
      * @return Whether or not the Traffic Class was set on the given socket.

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index 3d2fa44..ae555fd 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -17,14 +17,17 @@
 package org.apache.activemq.transport.tcp;
 
 import java.io.IOException;
+import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.activemq.TransportLoggerSupport;
 import org.apache.activemq.openwire.OpenWireFormat;
@@ -33,6 +36,7 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.WireFormatNegotiator;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
@@ -139,6 +143,15 @@ public class TcpTransportFactory extends TransportFactory {
         return createTcpTransport(wf, socketFactory, location, localLocation);
     }
 
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
+        throw new IOException("createTransport() method not implemented!");
+    }
+
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
+        throw new IOException("createTransport() method not implemented!");
+    }
+
     /**
      * Allows subclasses of TcpTransportFactory to provide a create custom
      * TcpTransport instances.

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
index a0778cd..c7fe00f 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/tcp/TcpTransportServer.java
@@ -44,6 +44,7 @@ import org.apache.activemq.TransportLoggerSupport;
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.openwire.OpenWireFormatFactory;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerThreadSupport;
 import org.apache.activemq.transport.nio.SelectorManager;
@@ -474,7 +475,11 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         return (InetSocketAddress) serverSocket.getLocalSocketAddress();
     }
 
-    protected final void handleSocket(Socket socket) {
+    protected void handleSocket(Socket socket) {
+        doHandleSocket(socket);
+    }
+
+    final protected void doHandleSocket(Socket socket) {
         boolean closeSocket = true;
         try {
             if (this.currentTransportCount.get() >= this.maximumConnections) {
@@ -483,6 +488,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
                     "maximumConnections' property on the TCP transport configuration URI " +
                     "in the ActiveMQ configuration file (e.g., activemq.xml)");
             } else {
+                currentTransportCount.incrementAndGet();
+
                 HashMap<String, Object> options = new HashMap<String, Object>();
                 options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
                 options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
@@ -496,22 +503,23 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
                 options.put("startLogging", Boolean.valueOf(startLogging));
                 options.putAll(transportOptions);
 
-                WireFormat format = wireFormatFactory.createWireFormat();
-                Transport transport = createTransport(socket, format);
+                TransportInfo transportInfo = configureTransport(this, socket);
                 closeSocket = false;
 
-                if (transport instanceof ServiceSupport) {
-                    ((ServiceSupport) transport).addServiceListener(this);
+                if (transportInfo.transport instanceof ServiceSupport) {
+                    ((ServiceSupport) transportInfo.transport).addServiceListener(this);
                 }
 
-                Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
+                Transport configuredTransport = transportInfo.transportFactory.serverConfigure(
+                        transportInfo.transport, transportInfo.format, options);
 
                 getAcceptListener().onAccept(configuredTransport);
-                currentTransportCount.incrementAndGet();
             }
         } catch (SocketTimeoutException ste) {
             // expect this to happen
+            currentTransportCount.decrementAndGet();
         } catch (Exception e) {
+            currentTransportCount.decrementAndGet();
             if (closeSocket) {
                 try {
                     socket.close();
@@ -528,6 +536,24 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         }
     }
 
+    protected TransportInfo configureTransport(final TcpTransportServer server, final Socket socket) throws Exception {
+        WireFormat format = wireFormatFactory.createWireFormat();
+        Transport transport = createTransport(socket, format);
+        return new TransportInfo(format, transport, transportFactory);
+    }
+
+    protected class TransportInfo {
+        final WireFormat format;
+        final Transport transport;
+        final TransportFactory transportFactory;
+
+        public TransportInfo(WireFormat format, Transport transport, TransportFactory transportFactory) {
+            this.format = format;
+            this.transport = transport;
+            this.transportFactory = transportFactory;
+        }
+    }
+
     public int getSoTimeout() {
         return soTimeout;
     }
@@ -567,6 +593,10 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
         this.maximumConnections = maximumConnections;
     }
 
+    public AtomicInteger getCurrentTransportCount() {
+        return currentTransportCount;
+    }
+
     @Override
     public void started(Service service) {
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-http/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-http/pom.xml b/activemq-http/pom.xml
index ef1f823..7d00276 100755
--- a/activemq-http/pom.xml
+++ b/activemq-http/pom.xml
@@ -220,6 +220,26 @@
         </plugins>
       </build>
     </profile>
-
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-jaas/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jaas/pom.xml b/activemq-jaas/pom.xml
index 48f2fb1..ce4ba07 100644
--- a/activemq-jaas/pom.xml
+++ b/activemq-jaas/pom.xml
@@ -148,5 +148,26 @@
         </plugins>
       </build>
     </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-jms-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-jms-pool/pom.xml b/activemq-jms-pool/pom.xml
index 9f2c8b7..ca4321d 100755
--- a/activemq-jms-pool/pom.xml
+++ b/activemq-jms-pool/pom.xml
@@ -132,6 +132,27 @@
         </plugins>
       </build>
     </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-kahadb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/pom.xml b/activemq-kahadb-store/pom.xml
index d647725..80787ee 100755
--- a/activemq-kahadb-store/pom.xml
+++ b/activemq-kahadb-store/pom.xml
@@ -251,7 +251,28 @@
         </plugins>
       </build>
     </profile>
-
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    
     <profile>
       <id>activemq.tests.aix.excludes</id>
       <activation>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-karaf-itest/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-karaf-itest/pom.xml b/activemq-karaf-itest/pom.xml
index 063e6e8..5044fcb 100644
--- a/activemq-karaf-itest/pom.xml
+++ b/activemq-karaf-itest/pom.xml
@@ -267,6 +267,27 @@
         </plugins>
       </build>
     </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-leveldb-store/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/pom.xml b/activemq-leveldb-store/pom.xml
index d39679d..525e413 100644
--- a/activemq-leveldb-store/pom.xml
+++ b/activemq-leveldb-store/pom.xml
@@ -512,6 +512,27 @@
           </plugins>
         </build>
       </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
 
         <profile>
             <id>activemq.tests.windows.excludes</id>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-mqtt/pom.xml b/activemq-mqtt/pom.xml
index 3aa1f70..36384e0 100755
--- a/activemq-mqtt/pom.xml
+++ b/activemq-mqtt/pom.xml
@@ -289,6 +289,27 @@
         </plugins>
       </build>
     </profile>
+    <profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <includes>
+                <include>**/auto/*Test.java</include>
+              </includes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
   
   <repositories>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
index d82fcb5..0d3a838 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransport.java
@@ -23,6 +23,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 
 import javax.net.SocketFactory;
+import javax.net.ssl.SSLEngine;
 
 import org.apache.activemq.transport.nio.NIOSSLTransport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -37,7 +38,12 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
     }
 
     public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket) throws IOException {
-        super(wireFormat, socket);
+        super(wireFormat, socket, null, null, null);
+    }
+
+    public MQTTNIOSSLTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer) throws IOException {
+        super(wireFormat, socket, engine, initBuffer, inputBuffer);
     }
 
     @Override
@@ -56,4 +62,19 @@ public class MQTTNIOSSLTransport extends NIOSSLTransport {
         DataByteArrayInputStream dis = new DataByteArrayInputStream(fill);
         codec.parse(dis, fill.length);
     }
+
+    /* (non-Javadoc)
+     * @see org.apache.activemq.transport.nio.NIOSSLTransport#doInit()
+     */
+    @Override
+    protected void doInit() throws Exception {
+        if (initBuffer != null) {
+            nextFrameSize = -1;
+            receiveCounter += initBuffer.readSize;
+            initBuffer.buffer.flip();
+            processCommand(initBuffer.buffer);
+        }
+    }
+
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
index f13b537..b3a74bc 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOSSLTransportFactory.java
@@ -21,14 +21,18 @@ import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
 
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+
 import org.apache.activemq.broker.SslContext;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.wireformat.WireFormat;
 
@@ -39,6 +43,7 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
     @Override
     protected TcpTransportServer createTcpTransportServer(URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
         TcpTransportServer result = new TcpTransportServer(this, location, serverSocketFactory) {
+            @Override
             protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
                 MQTTNIOSSLTransport transport = new MQTTNIOSSLTransport(format, socket);
                 if (context != null) {
@@ -57,6 +62,13 @@ public class MQTTNIOSSLTransportFactory extends MQTTNIOTransportFactory {
     }
 
     @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            SSLEngine engine, InitBuffer initBuffer, ByteBuffer inputBuffer)
+            throws IOException {
+        return new MQTTNIOSSLTransport(wireFormat, socket, engine, initBuffer, inputBuffer);
+    }
+
+    @Override
     public TransportServer doBind(URI location) throws IOException {
         if (SslContext.getCurrentSslContext() != null) {
             try {

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
index e750366..ef33b20 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransport.java
@@ -56,6 +56,10 @@ public class MQTTNIOTransport extends TcpTransport {
         super(wireFormat, socket);
     }
 
+    public MQTTNIOTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException {
+        super(wireFormat, socket, initBuffer);
+    }
+
     @Override
     protected void initializeStreams() throws IOException {
         channel = socket.getChannel();
@@ -84,6 +88,16 @@ public class MQTTNIOTransport extends TcpTransport {
         dataOut = new DataOutputStream(outPutStream);
         buffOut = outPutStream;
         codec = new MQTTCodec(this, (MQTTWireFormat) getWireFormat());
+
+        try {
+            if (initBuffer != null) {
+                processBuffer(initBuffer.buffer, initBuffer.readSize);
+            }
+        } catch (IOException e) {
+            onException(e);
+        } catch (Throwable e) {
+            onException(IOExceptionSupport.create(e));
+        }
     }
 
     private void serviceRead() {
@@ -103,14 +117,7 @@ public class MQTTNIOTransport extends TcpTransport {
                     break;
                 }
 
-                inputBuffer.flip();
-                DataByteArrayInputStream dis = new DataByteArrayInputStream(inputBuffer.array());
-                codec.parse(dis, readSize);
-
-                receiveCounter += readSize;
-
-                // clear the buffer
-                inputBuffer.clear();
+                processBuffer(inputBuffer, readSize);
             }
         } catch (IOException e) {
             onException(e);
@@ -119,6 +126,17 @@ public class MQTTNIOTransport extends TcpTransport {
         }
     }
 
+    protected void processBuffer(ByteBuffer buffer, int readSize) throws Exception {
+        buffer.flip();
+        DataByteArrayInputStream dis = new DataByteArrayInputStream(buffer.array());
+        codec.parse(dis, readSize);
+
+        receiveCounter += readSize;
+
+        // clear the buffer
+        buffer.clear();
+    }
+
     @Override
     protected void doStart() throws Exception {
         connect();

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
index efc85be..5823add 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTNIOTransportFactory.java
@@ -33,6 +33,7 @@ import org.apache.activemq.transport.MutexTransport;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOTransportFactory;
 import org.apache.activemq.transport.tcp.TcpTransport;
+import org.apache.activemq.transport.tcp.TcpTransport.InitBuffer;
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
@@ -66,6 +67,12 @@ public class MQTTNIOTransportFactory extends NIOTransportFactory implements Brok
         return new MQTTNIOTransport(wf, socketFactory, location, localLocation);
     }
 
+    @Override
+    public TcpTransport createTransport(WireFormat wireFormat, Socket socket,
+            InitBuffer initBuffer) throws IOException {
+        return new MQTTNIOTransport(wireFormat, socket, initBuffer);
+    }
+
     @SuppressWarnings("rawtypes")
     @Override
     public Transport serverConfigure(Transport transport, WireFormat format, HashMap options) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
new file mode 100644
index 0000000..e777385
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioSslTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoNioSslTest extends MQTTTest {
+
+    @Override
+    public String getProtocolScheme() {
+        return "auto+nio+ssl";
+    }
+
+    @Override
+    public boolean isUseSSL() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
new file mode 100644
index 0000000..f7023a3
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoNioTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoNioTest extends MQTTTest {
+
+    @Override
+    public String getProtocolScheme() {
+        return "auto+nio";
+    }
+
+    @Override
+    public boolean isUseSSL() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
new file mode 100644
index 0000000..e31f494
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoSslTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoSslTest extends MQTTTest {
+
+    @Override
+    public String getProtocolScheme() {
+        return "auto+ssl";
+    }
+
+    @Override
+    public boolean isUseSSL() {
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
new file mode 100644
index 0000000..7471f6e
--- /dev/null
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/auto/MQTTAutoTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.mqtt.auto;
+
+import org.apache.activemq.transport.mqtt.MQTTTest;
+
+/**
+ * Run the basic tests with the NIO Transport.
+ */
+public class MQTTAutoTest extends MQTTTest {
+
+    @Override
+    public String getProtocolScheme() {
+        return "auto";
+    }
+
+    @Override
+    public boolean isUseSSL() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-partition/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-partition/pom.xml b/activemq-partition/pom.xml
index fa2bc8f..3df537e 100644
--- a/activemq-partition/pom.xml
+++ b/activemq-partition/pom.xml
@@ -130,6 +130,27 @@
         </plugins>
       </build>
     </profile>
+	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
 
   </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/activemq/blob/04ee70a1/activemq-pool/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-pool/pom.xml b/activemq-pool/pom.xml
index 2b9d751..d94dde4 100755
--- a/activemq-pool/pom.xml
+++ b/activemq-pool/pom.xml
@@ -104,5 +104,29 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
+  
+  <profiles>
+  	<profile>
+      <id>activemq.tests-autoTransport</id>
+      <activation>
+        <property>
+          <name>activemq.tests</name>
+          <value>autoTransport</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+              <excludes>
+                <exclude>**</exclude>
+              </excludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
 
 </project>