You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/12 16:44:20 UTC

[1/2] qpid-jms git commit: QPIDJMS-232 Perform authentication on transport connect

Repository: qpid-jms
Updated Branches:
  refs/heads/master ae1ce8671 -> 22bbb5da4


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
index 3fabf73..188e6d8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
@@ -52,7 +52,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
     private static final String PASSWORD = "password";
 
-    @Test(timeout = 20000)
+    @Test //(timeout = 20000)  // TODO
     public void testSaslExternalConnection() throws Exception {
         TransportSslOptions sslOptions = new TransportSslOptions();
         sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
@@ -70,7 +70,9 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer(context, true);) {
             // Expect an EXTERNAL connection
-            testPeer.expectSaslExternalConnect();
+            testPeer.expectSaslExternal();
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -95,7 +97,9 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -116,7 +120,8 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     public void testSaslAnonymousConnection() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             // Expect an ANOYMOUS connection
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -172,7 +177,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private void doMechanismSelectedTestImpl(String username, String password, Symbol clientSelectedMech, Symbol[] serverMechs, boolean wait) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            testPeer.expectFailingSaslConnect(serverMechs, clientSelectedMech);
+            testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech);
 
             ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=myclientid");
             try {
@@ -223,7 +228,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
                                "transport.keyStorePassword=" + PASSWORD;
             }
 
-            testPeer.expectFailingSaslConnect(serverMechs, clientSelectedMech);
+            testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech);
 
             JmsConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + connOptions);
             try {
@@ -285,7 +290,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private void doMechanismSelectionRestrictedTestImpl(String username, String password, Symbol clientSelectedMech, Symbol[] serverMechs, String mechanismsOptionValue) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            testPeer.expectFailingSaslConnect(serverMechs, clientSelectedMech);
+            testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech);
 
             String uriOptions = "?jms.clientID=myclientid";
             if(mechanismsOptionValue != null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
index 7ac99f8..679c6f5 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConnectionInfoTest.java
@@ -71,7 +71,7 @@ public class JmsConnectionInfoTest {
         info.setClientId("test", true);
         info.setCloseTimeout(100);
         info.setConnectTimeout(200);
-        info.setForceAsyncSends(true);
+        info.setForceAsyncSend(true);
         info.setPassword("pass");
         info.setQueuePrefix("queue");
         info.setRequestTimeout(50);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderWrapperTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderWrapperTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderWrapperTest.java
index 3da448b..3391ffc 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderWrapperTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderWrapperTest.java
@@ -25,6 +25,7 @@ import java.net.URI;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConnectionId;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
@@ -127,8 +128,11 @@ public class ProviderWrapperTest extends QpidJmsTestCase{
         Provider mockProvider = Mockito.mock(Provider.class);
         ProviderWrapper<Provider> wrapper = new ProviderWrapper<Provider>(mockProvider);
 
-        wrapper.connect();
-        Mockito.verify(mockProvider).connect();
+        JmsConnectionId id = new JmsConnectionId("ID:1");
+        JmsConnectionInfo connectionInfo = new JmsConnectionInfo(id);
+
+        wrapper.connect(connectionInfo);
+        Mockito.verify(mockProvider).connect(connectionInfo);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 8b6ebcb..26ccdcf 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -54,12 +54,15 @@ public class AmqpProviderTest extends QpidJmsTestCase {
     private TestAmqpPeer testPeer;
     private URI peerURI;
     private AmqpProvider provider;
+    private JmsConnectionInfo connectionInfo;
 
     @Override
     @Before
     public void setUp() throws Exception {
         testPeer = new TestAmqpPeer();
+        testPeer.setSuppressReadExceptionOnClose(true);
         peerURI = new URI("amqp://localhost:" + testPeer.getServerPort());
+        connectionInfo = new JmsConnectionInfo(new JmsConnectionId("ID:TEST-Connection:1"));
     }
 
     @Override
@@ -92,7 +95,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         provider = new AmqpProvider(peerURI);
         provider.setTransportType("ftp");
         try {
-            provider.connect();
+            provider.connect(connectionInfo);
             fail("Should have failed to connect.");
         } catch (Exception ex) {
         }
@@ -103,7 +106,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         provider = new AmqpProvider(peerURI);
         testPeer.close();
         try {
-            provider.connect();
+            provider.connect(connectionInfo);
             fail("Should have failed to connect.");
         } catch (Exception ex) {
         }
@@ -111,8 +114,10 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testStartThrowsIfNoListenerSet() throws Exception {
+        testPeer.expectSaslAnonymous();
+
         provider = new AmqpProvider(peerURI);
-        provider.connect();
+        provider.connect(connectionInfo);
 
         try {
             provider.start();
@@ -123,16 +128,20 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testToString() throws IOException {
+        testPeer.expectSaslAnonymous();
+
         provider = new AmqpProvider(peerURI);
-        provider.connect();
+        provider.connect(connectionInfo);
         assertTrue(provider.toString().contains("localhost"));
         assertTrue(provider.toString().contains(String.valueOf(peerURI.getPort())));
     }
 
     @Test(timeout=20000)
     public void testClosedProviderThrowsIOException() throws IOException {
+        testPeer.expectSaslAnonymous();
+
         provider = new AmqpProvider(peerURI);
-        provider.connect();
+        provider.connect(connectionInfo);
         provider.close();
 
         try {
@@ -141,7 +150,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         } catch (IOException ex) {}
 
         try {
-            provider.connect();
+            provider.connect(connectionInfo);
             fail("Should have thrown an IOException when closed.");
         } catch (IOException ex) {}
 
@@ -159,10 +168,14 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         final long SEND_TIMEOUT = TimeUnit.SECONDS.toMillis(6);
         final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(7);
 
+        connectionInfo.setUsername(TEST_USERNAME);
+        connectionInfo.setPassword(TEST_PASSWORD);
+
         provider = new AmqpProvider(peerURI);
-        testPeer.expectSaslPlainConnect(TEST_USERNAME, TEST_PASSWORD, null, null);
+        testPeer.expectSaslPlain(TEST_USERNAME, TEST_PASSWORD);
+        testPeer.expectOpen();
         testPeer.expectBegin();
-        provider.connect();
+        provider.connect(connectionInfo);
         testPeer.expectClose();
 
         JmsConnectionInfo connectionInfo = createConnectionInfo();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index f90da49..9c35006 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -111,7 +111,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             originalPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -171,7 +172,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
@@ -200,7 +202,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             // Set expectations on rejecting and final peer
             rejectingPeer.expectSaslHeaderThenDrop();
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
 
             // Close the original peer and wait for things to shake out.
@@ -233,7 +236,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -302,7 +306,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             originalPeer.waitForAllHandlersToComplete(3000);
 
             // Set the secondary peer to expect connection restoration, this time send disposition accepting the message
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectSenderAttach();
@@ -343,7 +348,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
             originalPeer.expectBegin();
             originalPeer.dropAfterLastHandler();
@@ -374,7 +380,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
 
@@ -399,7 +406,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", originalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
 
             int delay = 20000;
@@ -438,7 +446,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
             originalPeer.expectBegin();
             originalPeer.expectReceiverAttach();
@@ -470,7 +479,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectReceiverAttach();
@@ -511,7 +521,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
             originalPeer.expectBegin();
             originalPeer.expectReceiverAttach();
@@ -543,7 +554,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectReceiverAttach();
@@ -587,7 +599,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
             originalPeer.expectBegin();
             originalPeer.expectReceiverAttach();
@@ -621,7 +634,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectReceiverAttach();
@@ -665,7 +679,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Final peer is at: {}", finalURI);
 
             // Connect to the first peer
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -698,7 +713,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectQueueBrowserAttach();
@@ -737,7 +753,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
     private void doCreateConsumerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
 
             Connection connection = establishAnonymousConnecton(testPeer);
@@ -790,7 +807,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", originalURI);
             LOG.info("Final peer is at: {}", finalURI);
 
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
 
             final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
@@ -827,7 +845,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             finalPeer.expectBegin();
             finalPeer.expectCoordinatorAttach();
@@ -868,7 +887,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", originalURI);
             LOG.info("Final peer is at: {}", finalURI);
 
-            originalPeer.expectSaslAnonymousConnect();
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
             originalPeer.expectBegin();
             originalPeer.expectBegin();
             String dynamicAddress1 = "myTempTopicAddress";
@@ -899,7 +919,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
 
             // --- Post Failover Expectations of FinalPeer --- //
 
-            finalPeer.expectSaslAnonymousConnect();
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
             finalPeer.expectBegin();
             String dynamicAddress2 = "myTempTopicAddress2";
             finalPeer.expectTempTopicCreationAttach(dynamicAddress2);
@@ -943,7 +964,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", peerURI);
 
             // Connect to the test peer
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.dropAfterLastHandler();
 
@@ -998,7 +1020,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", peerURI);
 
             // Connect to the test peer
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1060,7 +1083,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             LOG.info("Original peer is at: {}", peerURI);
 
             // Connect to the test peer
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.dropAfterLastHandler();
@@ -1120,7 +1144,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             final Connection connection = establishAnonymousConnecton(
                 "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1157,7 +1182,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             final JmsConnection connection = establishAnonymousConnecton(
                 "failover.reconnectDelay=2000&failover.maxReconnectAttempts=5", testPeer);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1214,7 +1240,8 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                 "failover.reconnectDelay=2000&failover.maxReconnectAttempts=60",
                 testPeer);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
index d77b5d4..af190e6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
@@ -61,7 +61,7 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
 
     @Test(timeout=30000, expected=IOException.class)
     public void testConnect() throws Exception {
-        provider.connect();
+        provider.connect(connection);
     }
 
     @Test(timeout=30000, expected=IOException.class)

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index bd2ed86..e543eec 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -63,6 +63,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     private List<URI> uris;
     private FailoverProvider provider;
+    private JmsConnectionInfo connection;
 
     @Override
     @Before
@@ -74,6 +75,8 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         uris.add(new URI("mock://192.168.2.3:5672"));
         uris.add(new URI("mock://192.168.2.4:5672"));
 
+        connection = createConnectionInfo();
+
         super.setUp();
     }
 
@@ -130,7 +133,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
 
         assertNull(provider.getRemoteURI());
-        provider.connect();
+        provider.connect(connection);
         assertTrue("Should have a remote URI after connect", Wait.waitFor(new Wait.Condition() {
 
             @Override
@@ -145,7 +148,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
 
         assertNotNull(provider.toString());
-        provider.connect();
+        provider.connect(connection);
         assertTrue("Should have a mock scheme after connect", Wait.waitFor(new Wait.Condition() {
 
             @Override
@@ -172,7 +175,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
             }
         });
 
-        provider.connect();
+        provider.connect(connection);
 
         ProviderFuture request = new ProviderFuture();
         provider.create(createConnectionInfo(), request);
@@ -192,7 +195,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
-        provider.connect();
+        provider.connect(connection);
 
         try {
             provider.start();
@@ -398,7 +401,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
             }
         });
 
-        provider.connect();
+        provider.connect(connection);
         provider.start();
 
         JmsConnectionInfo connectionInfo = createConnectionInfo();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
index 0191d90..5c9bef6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
@@ -60,7 +60,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
             final String redirectURI = createPeerURI(redirectedPeer);
             LOG.info("Backup peer is at: {}", redirectURI);
 
-            redirectedPeer.expectSaslAnonymousConnect();
+            redirectedPeer.expectSaslAnonymous();
+            redirectedPeer.expectOpen();
             redirectedPeer.expectBegin();
 
             Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>();
@@ -103,7 +104,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
             final String redirectURI = createPeerURI(redirectedPeer);
             LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI);
 
-            redirectedPeer.expectSaslAnonymousConnect();
+            redirectedPeer.expectSaslAnonymous();
+            redirectedPeer.expectOpen();
             redirectedPeer.expectBegin();
 
             Map<Symbol, Object> redirectInfo = new HashMap<Symbol, Object>();
@@ -111,7 +113,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
             redirectInfo.put(NETWORK_HOST, "localhost");
             redirectInfo.put(PORT, redirectedPeer.getServerPort());
 
-            rejectingPeer.expectSaslAnonymousConnect();
+            rejectingPeer.expectSaslAnonymous();
+            rejectingPeer.expectOpen();
             rejectingPeer.expectBegin();
             rejectingPeer.remotelyCloseConnection(true, ConnectionError.REDIRECT, "Server is full, go away", redirectInfo);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index 187a81f..e822239 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -92,7 +92,7 @@ public class MockProvider implements Provider {
     }
 
     @Override
-    public void connect() throws IOException {
+    public void connect(JmsConnectionInfo connectionInfo) throws IOException {
         checkClosed();
 
         stats.recordConnectAttempt();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 84da3c2..b8e065f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -421,9 +421,7 @@ public class TestAmqpPeer implements AutoCloseable
         return openFrame;
     }
 
-    public void expectSaslConnect(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
-                                  Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, Matcher<?> idleTimeoutMatcher,
-                                  Matcher<?> hostnameMatcher, boolean deferOpened)
+    public void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
@@ -459,55 +457,9 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(saslInitMatcher);
 
         addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
-
-        OpenFrame open = createOpenFrame();
-        if(serverCapabilities != null)
-        {
-            open.setOfferedCapabilities(serverCapabilities);
-        }
-
-        if(serverProperties != null)
-        {
-            open.setProperties(serverProperties);
-        }
-
-        OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class));
-        if (!deferOpened) {
-            openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null));
-        }
-
-        if (desiredCapabilities != null)
-        {
-            openMatcher.withDesiredCapabilities(arrayContaining(desiredCapabilities));
-        }
-        else
-        {
-            openMatcher.withDesiredCapabilities(nullValue());
-        }
-
-        if(idleTimeoutMatcher !=null)
-        {
-            openMatcher.withIdleTimeOut(idleTimeoutMatcher);
-        }
-
-        if(hostnameMatcher != null)
-        {
-            openMatcher.withHostname(hostnameMatcher);
-        }
-
-        if(clientPropertiesMatcher != null) {
-            openMatcher.withProperties(clientPropertiesMatcher);
-        }
-
-        addHandler(openMatcher);
-    }
-
-    public void expectSaslPlainConnect(String username, String password, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties)
-    {
-        expectSaslPlainConnect(username, password, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, serverCapabilities, serverProperties);
     }
 
-    public void expectSaslPlainConnect(String username, String password, Symbol[] desiredCapabilities, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties)
+    public void expectSaslPlain(String username, String password)
     {
         byte[] usernameBytes = username.getBytes();
         byte[] passwordBytes = password.getBytes();
@@ -517,40 +469,30 @@ public class TestAmqpPeer implements AutoCloseable
 
         Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
 
-        expectSaslConnect(PLAIN, initialResponseMatcher, desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
+        expectSaslAuthentication(PLAIN, initialResponseMatcher, null);
     }
 
-    public void expectSaslExternalConnect()
+    public void expectSaslExternal()
     {
         if(!_driverRunnable.isNeedClientCert())
         {
             throw new IllegalStateException("need-client-cert must be enabled on the test peer");
         }
 
-        expectSaslConnect(EXTERNAL, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, false);
+        expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null);
     }
 
-    public void expectSaslAnonymousConnect()
+    public void expectSaslAnonymous()
     {
-        expectSaslAnonymousConnect(null, null);
+        expectSaslAnonymous(null);
     }
 
-    public void expectSaslAnonymousConnect(boolean deferOpened)
+    public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
     {
-        expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, null, null, null, null, deferOpened);
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher);
     }
 
-    public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher)
-    {
-        expectSaslAnonymousConnect(idleTimeoutMatcher, hostnameMatcher, null, null);
-    }
-
-    public void expectSaslAnonymousConnect(Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, Matcher<?> propertiesMatcher, Map<Symbol, Object> serverProperties)
-    {
-        expectSaslConnect(ANONYMOUS, equalTo(new Binary(new byte[0])), new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, propertiesMatcher, serverProperties, idleTimeoutMatcher, hostnameMatcher, false);
-    }
-
-    public void expectFailingSaslConnect(Symbol[] serverMechs, Symbol clientSelectedMech)
+    public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs);
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
@@ -604,12 +546,76 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(openMatcher);
     }
 
+    public void expectOpen() {
+        expectOpen(false);
+    }
+
+    public void expectOpen(boolean deferOpened) {
+        expectOpen(null, null, deferOpened);
+    }
+
+    public void expectOpen(Map<Symbol, Object> serverProperties) {
+        expectOpen(new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, serverProperties, null, null, false);
+    }
+
+    public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
+        expectOpen(clientPropertiesMatcher, nullValue(), hostnameMatcher, deferOpened);
+    }
+
+    public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
+        expectOpen(new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, clientPropertiesMatcher, null, null, hostnameMatcher, deferOpened);
+    }
+
+    public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties) {
+        expectOpen(desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
+    }
+
+    public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
+                           Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties,
+                           Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
+
+        OpenFrame open = createOpenFrame();
+        if (serverCapabilities != null) {
+            open.setOfferedCapabilities(serverCapabilities);
+        }
+
+        if (serverProperties != null) {
+            open.setProperties(serverProperties);
+        }
+
+        OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class));
+        if (!deferOpened) {
+            openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null));
+        }
+
+        if (desiredCapabilities != null) {
+            openMatcher.withDesiredCapabilities(arrayContaining(desiredCapabilities));
+        } else {
+            openMatcher.withDesiredCapabilities(nullValue());
+        }
+
+        if (idleTimeoutMatcher != null) {
+            openMatcher.withIdleTimeOut(idleTimeoutMatcher);
+        }
+
+        if (hostnameMatcher != null) {
+            openMatcher.withHostname(hostnameMatcher);
+        }
+
+        if (clientPropertiesMatcher != null) {
+            openMatcher.withProperties(clientPropertiesMatcher);
+        }
+
+        addHandler(openMatcher);
+    }
+
     public void rejectConnect(Symbol errorType, String errorMessage, Map<Symbol, Object> errorInfo) {
         // Expect a connection, establish through the SASL negotiation and sending of the Open frame
         Map<Symbol, Object> serverProperties = new HashMap<Symbol, Object>();
         serverProperties.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
 
-        expectSaslAnonymousConnect(null, null, null, serverProperties);
+        expectSaslAnonymous();
+        expectOpen(serverProperties);
 
         // Now generate the Close frame with the supplied error
         final FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, errorInfo, 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-232 Perform authentication on transport connect

Posted by ta...@apache.org.
QPIDJMS-232 Perform authentication on transport connect

Do the authentication on transport connect as opposed to waiting until
the Connection is first used to allow for early detection of a connect
failure due to do bad credentials.  Connection will now fail from the
ConnectionFactory createConnection call if the user cannot be
authenticated instead of waiting until the connection is used.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/22bbb5da
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/22bbb5da
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/22bbb5da

Branch: refs/heads/master
Commit: 22bbb5da45f03f6534af85d056aa249bf9e51ac0
Parents: ae1ce86
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Dec 12 11:43:07 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Dec 12 11:43:07 2016 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  63 +++++---
 .../apache/qpid/jms/JmsConnectionFactory.java   |  94 ++++++++----
 .../org/apache/qpid/jms/JmsQueueConnection.java |   6 +-
 .../org/apache/qpid/jms/JmsTopicConnection.java |   6 +-
 .../apache/qpid/jms/meta/JmsConnectionInfo.java |   2 +-
 .../org/apache/qpid/jms/provider/Provider.java  |   6 +-
 .../qpid/jms/provider/ProviderWrapper.java      |   5 +-
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 123 ++++++++++------
 .../provider/amqp/AmqpSaslAuthenticator.java    |  53 +++++--
 .../jms/provider/failover/FailoverProvider.java |   6 +-
 .../org/apache/qpid/jms/JmsConnectionTest.java  |  59 ++++----
 .../qpid/jms/JmsConnectionTestSupport.java      |  20 ++-
 .../ConnectionFactoryIntegrationTest.java       |  90 +++++++++++-
 .../integration/ConnectionIntegrationTest.java  |   6 +-
 .../FailedConnectionsIntegrationTest.java       |   3 +-
 .../integration/IdleTimeoutIntegrationTest.java |  24 +++-
 .../jms/integration/IntegrationTestFixture.java |   6 +-
 .../integration/ProducerIntegrationTest.java    |  36 +++--
 .../jms/integration/SaslIntegrationTest.java    |  19 ++-
 .../qpid/jms/meta/JmsConnectionInfoTest.java    |   2 +-
 .../qpid/jms/provider/ProviderWrapperTest.java  |   8 +-
 .../jms/provider/amqp/AmqpProviderTest.java     |  29 ++--
 .../failover/FailoverIntegrationTest.java       |  81 +++++++----
 .../failover/FailoverProviderClosedTest.java    |   2 +-
 .../provider/failover/FailoverProviderTest.java |  13 +-
 .../provider/failover/FailoverRedirectTest.java |   9 +-
 .../qpid/jms/provider/mock/MockProvider.java    |   2 +-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 144 ++++++++++---------
 28 files changed, 614 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 0837d78..10031a6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -78,7 +78,6 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderSynchronization;
-import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,7 +89,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
     private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class);
 
-    private final IdGenerator clientIdGenerator;
     private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<JmsSessionId, JmsSession>();
     private final AtomicBoolean connected = new AtomicBoolean();
     private final AtomicBoolean closed = new AtomicBoolean();
@@ -115,7 +113,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
     private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>();
 
-    protected JmsConnection(final String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+    protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
 
         // This executor can be used for dispatching asynchronous tasks that might block or result
         // in reentrant calls to this Connection that could block.  The thread in this executor
@@ -124,7 +122,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             @Override
             public Thread newThread(Runnable target) {
-                Thread thread = new Thread(target, "QpidJMS Connection Executor: " + connectionId);
+                Thread thread = new Thread(target, "QpidJMS Connection Executor: " + connectionInfo.getId());
                 thread.setDaemon(false);
                 return thread;
             }
@@ -147,8 +145,31 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             throw JmsExceptionSupport.create(e);
         }
 
-        this.clientIdGenerator = clientIdGenerator;
-        this.connectionInfo = new JmsConnectionInfo(new JmsConnectionId(connectionId));
+        this.connectionInfo = connectionInfo;
+    }
+
+    JmsConnection connect() throws JMSException {
+        if (provider == null) {
+            throw new IllegalStateException("Remote provider instance not set.");
+        }
+
+        try {
+            provider.connect(connectionInfo);
+        } catch (Exception ex) {
+            LOG.error("Failed to connect to remote at: {}", connectionInfo.getConfiguredURI());
+            LOG.trace("Error: ", ex);
+            try {
+                provider.close();
+            } catch (Throwable ignored) {}
+
+            throw JmsExceptionSupport.create(ex);
+        }
+
+        if (connectionInfo.isExplicitClientID()) {
+            createJmsConnection();
+        }
+
+        return this;
     }
 
     @Override
@@ -269,7 +290,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     @Override
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
         JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
         addSession(result.getSessionInfo(), result);
@@ -282,7 +303,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     @Override
     public synchronized String getClientID() throws JMSException {
         checkClosedOrFailed();
-        return this.connectionInfo.getClientId();
+        return connected.get() ? connectionInfo.getClientId() : null;
     }
 
     @Override
@@ -309,13 +330,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
         // We weren't connected if we got this far, we should now connect to ensure the
         // configured clientID is valid.
-        connect();
+        createJmsConnection();
     }
 
     @Override
     public void start() throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         if (started.compareAndSet(false, true)) {
             try {
                 for (JmsSession s : sessions.values()) {
@@ -363,14 +384,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     @Override
     public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
     @Override
     public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
@@ -378,7 +399,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
                                                               String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
@@ -386,7 +407,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
@@ -394,7 +415,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
@@ -402,14 +423,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         throw new JMSException("Not supported");
     }
 
     @Override
     public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
         JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode);
         addSession(result.getSessionInfo(), result);
@@ -422,7 +443,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     @Override
     public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
-        connect();
+        createJmsConnection();
         int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
         JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode);
         addSession(result.getSessionInfo(), result);
@@ -466,7 +487,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         sessions.put(sessionInfo.getId(), session);
     }
 
-    private void connect() throws JMSException {
+    private void createJmsConnection() throws JMSException {
         if (isConnected() || closed.get()) {
             return;
         }
@@ -477,7 +498,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             }
 
             if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) {
-                connectionInfo.setClientId(clientIdGenerator.generateId(), false);
+                throw new IllegalArgumentException("Client ID cannot be null or empty string");
             }
 
             createResource(connectionInfo);
@@ -848,7 +869,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     public void setForceAsyncSend(boolean forceAsyncSend) {
-        connectionInfo.setForceAsyncSends(forceAsyncSend);
+        connectionInfo.setForceAsyncSend(forceAsyncSend);
     }
 
     public boolean isForceSyncSend() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 5547f11..1fdb7fc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -35,6 +35,7 @@ import javax.jms.TopicConnectionFactory;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.jndi.JNDIStorable;
+import org.apache.qpid.jms.meta.JmsConnectionId;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.policy.JmsDefaultDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy;
@@ -159,14 +160,25 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
     @Override
     public TopicConnection createTopicConnection(String username, String password) throws JMSException {
+        JmsTopicConnection connection = null;
+
         try {
-            String connectionId = getConnectionIdGenerator().generateId();
+            JmsConnectionInfo connectionInfo = configureConnectionInfo(username, password);
             Provider provider = createProvider(remoteURI);
-            JmsTopicConnection result = new JmsTopicConnection(connectionId, provider, getClientIdGenerator());
-            return configureConnection(result, username, password);
+
+            connection = new JmsTopicConnection(connectionInfo, provider);
+            connection.setExceptionListener(exceptionListener);
+            connection.connect();
         } catch (Exception e) {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Throwable ignored) {}
+            }
             throw JmsExceptionSupport.create(e);
         }
+
+        return connection;
     }
 
     @Override
@@ -176,14 +188,25 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
     @Override
     public Connection createConnection(String username, String password) throws JMSException {
+        JmsConnection connection = null;
+
         try {
-            String connectionId = getConnectionIdGenerator().generateId();
+            JmsConnectionInfo connectionInfo = configureConnectionInfo(username, password);
             Provider provider = createProvider(remoteURI);
-            JmsConnection result = new JmsConnection(connectionId, provider, getClientIdGenerator());
-            return configureConnection(result, username, password);
+
+            connection = new JmsConnection(connectionInfo, provider);
+            connection.setExceptionListener(exceptionListener);
+            connection.connect();
         } catch (Exception e) {
-            throw JmsExceptionSupport.create("Failed to create connection to: " + getRemoteURI(), e);
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Throwable ignored) {}
+            }
+            throw JmsExceptionSupport.create(e);
         }
+
+        return connection;
     }
 
     @Override
@@ -193,45 +216,61 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
     @Override
     public QueueConnection createQueueConnection(String username, String password) throws JMSException {
+        JmsQueueConnection connection = null;
+
         try {
-            String connectionId = getConnectionIdGenerator().generateId();
+            JmsConnectionInfo connectionInfo = configureConnectionInfo(username, password);
             Provider provider = createProvider(remoteURI);
-            JmsQueueConnection result = new JmsQueueConnection(connectionId, provider, getClientIdGenerator());
-            return configureConnection(result, username, password);
+
+            connection = new JmsQueueConnection(connectionInfo, provider);
+            connection.setExceptionListener(exceptionListener);
+            connection.connect();
         } catch (Exception e) {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (Throwable ignored) {}
+            }
             throw JmsExceptionSupport.create(e);
         }
+
+        return connection;
     }
 
-    protected <T extends JmsConnection> T configureConnection(T connection, String username, String password) throws JMSException {
+    protected JmsConnectionInfo configureConnectionInfo(String username, String password) throws JMSException {
         try {
             Map<String, String> properties = PropertyUtil.getProperties(this);
             // We must ensure that we apply the clientID last, since setting it on
             // the Connection object provokes establishing the underlying connection.
-            boolean setClientID = false;
+            boolean userSpecifiedClientId = false;
             if (properties.containsKey(CLIENT_ID_PROP)) {
-                setClientID = true;
+                userSpecifiedClientId = true;
                 properties.remove(CLIENT_ID_PROP);
             }
 
+            String connectionId = getConnectionIdGenerator().generateId();
+
             // Copy the configured policies before applying URI options that
             // might make additional configuration changes.
-            connection.setMessageIDPolicy(messageIDPolicy.copy());
-            connection.setPrefetchPolicy(prefetchPolicy.copy());
-            connection.setPresettlePolicy(presettlePolicy.copy());
-            connection.setRedeliveryPolicy(redeliveryPolicy.copy());
-            connection.setDeserializationPolicy(deserializationPolicy.copy());
-
-            PropertyUtil.setProperties(connection, properties);
-            connection.setExceptionListener(exceptionListener);
-            connection.setUsername(username);
-            connection.setPassword(password);
-            connection.setConfiguredURI(remoteURI);
-            if (setClientID) {
-                connection.setClientID(clientID);
+            JmsConnectionInfo connectionInfo = new JmsConnectionInfo(new JmsConnectionId(connectionId));
+
+            connectionInfo.setMessageIDPolicy(messageIDPolicy.copy());
+            connectionInfo.setPrefetchPolicy(prefetchPolicy.copy());
+            connectionInfo.setPresettlePolicy(presettlePolicy.copy());
+            connectionInfo.setRedeliveryPolicy(redeliveryPolicy.copy());
+            connectionInfo.setDeserializationPolicy(deserializationPolicy.copy());
+
+            PropertyUtil.setProperties(connectionInfo, properties);
+            connectionInfo.setUsername(username);
+            connectionInfo.setPassword(password);
+            connectionInfo.setConfiguredURI(remoteURI);
+            if (userSpecifiedClientId) {
+                connectionInfo.setClientId(clientID, true);
+            } else {
+                connectionInfo.setClientId(getClientIdGenerator().generateId(), false);
             }
 
-            return connection;
+            return connectionInfo;
         } catch (Exception e) {
             throw JmsExceptionSupport.create(e);
         }
@@ -276,7 +315,6 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
 
         try {
             result = ProviderFactory.create(remoteURI);
-            result.connect();
         } catch (Exception ex) {
             LOG.error("Failed to create JMS Provider instance for: {}", remoteURI.getScheme());
             LOG.trace("Error: ", ex);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
index 30e7252..ef2a75f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsQueueConnection.java
@@ -22,13 +22,13 @@ import javax.jms.ServerSessionPool;
 import javax.jms.Topic;
 import javax.jms.TopicSession;
 
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.util.IdGenerator;
 
 public class JmsQueueConnection extends JmsConnection implements AutoCloseable {
 
-    public JmsQueueConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
-        super(connectionId, provider, clientIdGenerator);
+    public JmsQueueConnection(JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
+        super(connectionInfo, provider);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
index 5b8f4d3..a0b9fdd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
@@ -22,13 +22,13 @@ import javax.jms.Queue;
 import javax.jms.QueueSession;
 import javax.jms.ServerSessionPool;
 
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.provider.Provider;
-import org.apache.qpid.jms.util.IdGenerator;
 
 public class JmsTopicConnection extends JmsConnection implements AutoCloseable {
 
-    public JmsTopicConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
-        super(connectionId, provider, clientIdGenerator);
+    public JmsTopicConnection(JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
+        super(connectionInfo, provider);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
index 412abd4..43d8b5d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java
@@ -113,7 +113,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne
         return forceAsyncSend;
     }
 
-    public void setForceAsyncSends(boolean forceAsyncSend) {
+    public void setForceAsyncSend(boolean forceAsyncSend) {
         this.forceAsyncSend = forceAsyncSend;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index 32ed03c..27b4ec5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.meta.JmsSessionId;
@@ -43,9 +44,12 @@ public interface Provider {
      * is considered to be unusable and no further operations should be attempted
      * using this Provider.
      *
+     * @param connectionInfo
+     * 		The JmsConnectionInfo that contains the properties that define this connection.
+     *
      * @throws IOException if the remote resource can not be contacted.
      */
-    void connect() throws IOException;
+    void connect(JmsConnectionInfo connectionInfo) throws IOException;
 
     /**
      * Starts the Provider.  The start method provides a place for the Provider to perform

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 65d4447..74bfa16 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -24,6 +24,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsResource;
 import org.apache.qpid.jms.meta.JmsSessionId;
@@ -51,8 +52,8 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
-    public void connect() throws IOException {
-        next.connect();
+    public void connect(JmsConnectionInfo connectionInfo) throws IOException {
+        next.connect(connectionInfo);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 832aa0d..fdc0489 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
 
 import org.apache.qpid.jms.JmsTemporaryDestination;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -103,7 +102,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     private ProviderListener listener;
     private AmqpConnection connection;
-    private AmqpSaslAuthenticator authenticator;
+    private volatile AmqpSaslAuthenticator authenticator;
     private org.apache.qpid.jms.transports.Transport transport;
     private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE;
     private String vhost;
@@ -125,7 +124,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private final Collector protonCollector = new CollectorImpl();
     private final Connection protonConnection = Connection.Factory.create();
 
-    private AsyncResult connectionOpenRequest;
+    private AsyncResult connectionRequest;
     private ScheduledFuture<?> nextIdleTimeoutCheck;
 
     /**
@@ -153,16 +152,72 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     }
 
     @Override
-    public void connect() throws IOException {
+    public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
         checkClosed();
 
-        try {
-            transport = TransportFactory.create(getTransportType(), getRemoteURI());
-        } catch (Exception e) {
-            throw IOExceptionSupport.create(e);
+        final ProviderFuture connectRequest = new ProviderFuture();
+
+        serializer.execute(new Runnable() {
+
+            @Override
+            public void run() {
+
+                connectionRequest = connectRequest;
+
+                protonTransport.setEmitFlowEventOnSend(false);
+
+                if (getMaxFrameSize() > 0) {
+                    protonTransport.setMaxFrameSize(getMaxFrameSize());
+                }
+
+                protonTransport.setChannelMax(getChannelMax());
+                protonTransport.setIdleTimeout(idleTimeout);
+                protonTransport.bind(protonConnection);
+                protonConnection.collect(protonCollector);
+
+                try {
+                    transport = TransportFactory.create(getTransportType(), getRemoteURI());
+                } catch (Exception e) {
+                    connectionRequest.onFailure(IOExceptionSupport.create(e));
+                }
+
+                transport.setTransportListener(AmqpProvider.this);
+
+                try {
+                    transport.connect();
+                } catch (Exception e) {
+                    connectionRequest.onFailure(IOExceptionSupport.create(e));
+                }
+
+                if (saslLayer) {
+                    Sasl sasl = protonTransport.sasl();
+                    sasl.client();
+
+                    String hostname = getVhost();
+                    if (hostname == null) {
+                        hostname = remoteURI.getHost();
+                    } else if (hostname.isEmpty()) {
+                        hostname = null;
+                    }
+
+                    sasl.setRemoteHostname(hostname);
+
+                    authenticator = new AmqpSaslAuthenticator(connectionRequest, sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
+                }
+
+                if (saslLayer) {
+                    pumpToProtonTransport();
+                } else {
+                    connectRequest.onSuccess();
+                }
+            }
+        });
+
+        if (connectionInfo.getConnectTimeout() != JmsConnectionInfo.INFINITE) {
+            connectRequest.sync(connectionInfo.getConnectTimeout(), TimeUnit.MILLISECONDS);
+        } else {
+            connectRequest.sync();
         }
-        transport.setTransportListener(this);
-        transport.connect();
     }
 
     @Override
@@ -271,35 +326,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                         public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
                             AmqpProvider.this.connectionInfo = connectionInfo;
 
-                            protonTransport.setEmitFlowEventOnSend(false);
-
-                            if (getMaxFrameSize() > 0) {
-                                protonTransport.setMaxFrameSize(getMaxFrameSize());
-                            }
-
-                            protonTransport.setChannelMax(getChannelMax());
-                            protonTransport.setIdleTimeout(idleTimeout);
-                            protonTransport.bind(protonConnection);
-                            protonConnection.collect(protonCollector);
-
-                            if (saslLayer) {
-                                Sasl sasl = protonTransport.sasl();
-                                sasl.client();
-
-                                String hostname = getVhost();
-                                if (hostname == null) {
-                                    hostname = remoteURI.getHost();
-                                } else if (hostname.isEmpty()) {
-                                    hostname = null;
-                                }
-
-                                sasl.setRemoteHostname(hostname);
-
-                                authenticator = new AmqpSaslAuthenticator(sasl, connectionInfo, transport.getLocalPrincipal(), saslMechanisms);
-                            }
-
                             AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
-                            AsyncResult wrappedOpenRequest = new AsyncResult() {
+                            connectionRequest = new AsyncResult() {
                                 @Override
                                 public void onSuccess() {
                                     fireConnectionEstablished();
@@ -317,9 +345,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
                                 }
                             };
 
-                            connectionOpenRequest = wrappedOpenRequest;
-
-                            builder.buildResource(wrappedOpenRequest);
+                            builder.buildResource(connectionRequest);
                         }
 
                         @Override
@@ -879,9 +905,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
         try {
             if (authenticator.authenticate()) {
+                if (!authenticator.wasSuccessful()) {
+                    // Close the transport to avoid emitting any additional frames.
+                    org.apache.qpid.proton.engine.Transport t = protonConnection.getTransport();
+                    t.close_head();
+                }
                 authenticator = null;
             }
-        } catch (JMSSecurityException ex) {
+        } catch (Throwable ex) {
             try {
                 org.apache.qpid.proton.engine.Transport t = protonConnection.getTransport();
                 t.close_head();
@@ -925,7 +956,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
 
     void fireConnectionEstablished() {
         // The request onSuccess calls this method
-        connectionOpenRequest = null;
+        connectionRequest = null;
 
         // Using nano time since it is not related to the wall clock, which may change
         long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
@@ -950,9 +981,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     }
 
     void fireProviderException(Throwable ex) {
-        if (connectionOpenRequest != null) {
-            connectionOpenRequest.onFailure(ex);
-            connectionOpenRequest = null;
+        if (connectionRequest != null) {
+            connectionRequest.onFailure(ex);
+            connectionRequest = null;
         }
 
         ProviderListener listener = this.listener;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
index 94bcf24..52b7674 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
@@ -24,6 +24,7 @@ import javax.jms.JMSSecurityException;
 import javax.security.sasl.SaslException;
 
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
 import org.apache.qpid.jms.sasl.Mechanism;
 import org.apache.qpid.jms.sasl.SaslMechanismFinder;
 import org.apache.qpid.proton.engine.Sasl;
@@ -38,6 +39,7 @@ public class AmqpSaslAuthenticator {
     private Mechanism mechanism;
     private final Principal localPrincipal;
     private Set<String> mechanismsRestriction;
+    private final AsyncResult authenticationRequest;
 
     /**
      * Create the authenticator and initialize it.
@@ -52,20 +54,22 @@ public class AmqpSaslAuthenticator {
      *        The possible mechanism(s) to which the client should restrict its
      *        mechanism selection to if offered by the server
      */
-    public AmqpSaslAuthenticator(Sasl sasl, JmsConnectionInfo info, Principal localPrincipal, String[] mechanismsRestriction) {
+    public AmqpSaslAuthenticator(AsyncResult request, Sasl sasl, JmsConnectionInfo info, Principal localPrincipal, String[] mechanismsRestriction) {
         this.sasl = sasl;
         this.info = info;
         this.localPrincipal = localPrincipal;
-        if(mechanismsRestriction != null) {
+        this.authenticationRequest = request;
+
+        if (mechanismsRestriction != null) {
             Set<String> mechs = new HashSet<String>();
-            for(int i = 0; i < mechanismsRestriction.length; i++) {
+            for (int i = 0; i < mechanismsRestriction.length; i++) {
                 String mech = mechanismsRestriction[i];
-                if(!mech.trim().isEmpty()) {
+                if (!mech.trim().isEmpty()) {
                     mechs.add(mech);
                 }
             }
 
-            if(!mechs.isEmpty()) {
+            if (!mechs.isEmpty()) {
                 this.mechanismsRestriction = mechs;
             }
         }
@@ -77,27 +81,50 @@ public class AmqpSaslAuthenticator {
      * successful authentication or a JMSSecurityException is thrown indicating that the
      * handshake failed.
      *
-     * @return true if the SASL handshake completes successfully.
+     * @param authenticationRequest
+     * 		The request that is awaiting the result of authentication.
      *
-     * @throws JMSSecurityException if a security violation is detected during the handshake.
+     * @return true if the authentication process completed.
      */
     public boolean authenticate() throws JMSSecurityException {
+        try {
+            switch (sasl.getState()) {
+                case PN_SASL_IDLE:
+                    handleSaslInit();
+                    break;
+                case PN_SASL_STEP:
+                    handleSaslStep();
+                    break;
+                case PN_SASL_FAIL:
+                    handleSaslFail();
+                    break;
+                case PN_SASL_PASS:
+                    authenticationRequest.onSuccess();
+                default:
+                    break;
+            }
+        } catch (JMSSecurityException result) {
+            authenticationRequest.onFailure(result);
+        }
+
+        return authenticationRequest.isComplete();
+    }
+
+    public boolean wasSuccessful() throws IllegalStateException {
         switch (sasl.getState()) {
+            case PN_SASL_CONF:
             case PN_SASL_IDLE:
-                handleSaslInit();
-                break;
             case PN_SASL_STEP:
-                handleSaslStep();
                 break;
             case PN_SASL_FAIL:
-                handleSaslFail();
-                break;
+                return false;
             case PN_SASL_PASS:
                 return true;
             default:
+                break;
         }
 
-        return false;
+        throw new IllegalStateException("Authentication has not completed yet.");
     }
 
     private void handleSaslInit() throws JMSSecurityException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 134e97a..7a76c7e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -106,6 +106,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private long nextReconnectDelay = -1;
     private IOException failureCause;
     private URI connectedURI;
+    private volatile JmsConnectionInfo connectionInfo;
 
     // Timeout values configured via JmsConnectionInfo
     private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
@@ -160,8 +161,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     }
 
     @Override
-    public void connect() throws IOException {
+    public void connect(JmsConnectionInfo connectionInfo) throws IOException {
         checkClosed();
+        this.connectionInfo = connectionInfo;
         LOG.debug("Initiating initial connection attempt task");
         triggerReconnectionAttempt();
     }
@@ -690,7 +692,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                     try {
                         LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target);
                         provider = ProviderFactory.create(target);
-                        provider.connect();
+                        provider.connect(connectionInfo);
                         initializeNewConnection(provider);
                         return;
                     } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index df8c713..9cf74c4 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -36,6 +36,8 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.jms.TemporaryTopic;
 
+import org.apache.qpid.jms.meta.JmsConnectionId;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.mock.MockProvider;
@@ -54,10 +56,13 @@ public class JmsConnectionTest {
 
     private MockProvider provider;
     private JmsConnection connection;
+    private JmsConnectionInfo connectionInfo;
 
     @Before
     public void setUp() throws Exception {
         provider = (MockProvider) MockProviderFactory.create(new URI("mock://localhost"));
+        connectionInfo = new JmsConnectionInfo(new JmsConnectionId("ID:TEST:1"));
+        connectionInfo.setClientId(clientIdGenerator.generateId(), false);
     }
 
     @After
@@ -70,12 +75,12 @@ public class JmsConnectionTest {
     @Test(timeout=30000, expected=JMSException.class)
     public void testJmsConnectionThrowsJMSExceptionProviderStartFails() throws JMSException, IllegalStateException, IOException {
         provider.getConfiguration().setFailOnStart(true);
-        try (JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);) {}
+        try (JmsConnection connection = new JmsConnection(connectionInfo, provider);) {}
     }
 
     @Test(timeout=30000)
     public void testStateAfterCreate() throws JMSException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isStarted());
         assertFalse(connection.isClosed());
@@ -84,7 +89,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testGetExceptionListener() throws JMSException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertNull(connection.getExceptionListener());
         connection.setExceptionListener(new ExceptionListener() {
@@ -99,7 +104,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testReplacePrefetchPolicy() throws JMSException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         JmsDefaultPrefetchPolicy newPolicy = new JmsDefaultPrefetchPolicy();
         newPolicy.setAll(1);
@@ -111,13 +116,13 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testGetConnectionId() throws JMSException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         assertEquals("ID:TEST:1", connection.getId().toString());
     }
 
     @Test(timeout=30000)
     public void testAddConnectionListener() throws JMSException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         JmsConnectionListener listener = new JmsDefaultConnectionListener();
         assertFalse(connection.removeConnectionListener(listener));
         connection.addConnectionListener(listener);
@@ -126,7 +131,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testConnectionStart() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isConnected());
         connection.start();
@@ -135,7 +140,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testConnectionMulitpleStartCalls() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isConnected());
         connection.start();
@@ -146,7 +151,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testConnectionStartAndStop() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isConnected());
         connection.start();
@@ -157,14 +162,14 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000, expected=InvalidClientIDException.class)
     public void testSetClientIDFromNull() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         assertFalse(connection.isConnected());
         connection.setClientID("");
     }
 
     @Test(timeout=30000)
     public void testCreateNonTXSessionWithTXAckMode() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         try {
@@ -176,7 +181,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testCreateNonTXSessionWithUnknownAckMode() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         try {
@@ -188,7 +193,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testCreateSessionWithUnknownAckMode() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         try {
@@ -200,7 +205,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testCreateSessionDefaultMode() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         JmsSession session = (JmsSession) connection.createSession();
@@ -209,14 +214,14 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000, expected=InvalidClientIDException.class)
     public void testSetClientIDFromEmptyString() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         assertFalse(connection.isConnected());
         connection.setClientID(null);
     }
 
     @Test(timeout=30000, expected=IllegalStateException.class)
     public void testSetClientIDFailsOnSecondCall() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isConnected());
         connection.setClientID("TEST-ID");
@@ -226,7 +231,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000, expected=IllegalStateException.class)
     public void testSetClientIDFailsAfterStart() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         assertFalse(connection.isConnected());
         connection.start();
@@ -236,7 +241,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testDeleteOfTempQueueOnClosedConnection() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -253,7 +258,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testDeleteOfTempTopicOnClosedConnection() throws JMSException, IOException {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -270,7 +275,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testConnectionCreatedSessionRespectsAcknowledgementMode() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.start();
 
         JmsSession session = (JmsSession) connection.createSession(Session.SESSION_TRANSACTED);
@@ -283,7 +288,7 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000)
     public void testConnectionMetaData() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
 
         ConnectionMetaData metaData = connection.getMetaData();
 
@@ -305,37 +310,37 @@ public class JmsConnectionTest {
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateConnectionConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createConnectionConsumer((JmsDestination) new JmsTopic(), "", null, 1);
     }
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateConnectionTopicConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createConnectionConsumer(new JmsTopic(), "", null, 1);
     }
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateConnectionQueueConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createConnectionConsumer(new JmsQueue(), "", null, 1);
     }
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateDurableConnectionQueueConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createDurableConnectionConsumer(new JmsTopic(), "", "", null, 1);
     }
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateSharedConnectionConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1);
     }
 
     @Test(timeout=30000, expected=JMSException.class)
     public void testCreateSharedDurableConnectionConsumer() throws Exception {
-        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection = new JmsConnection(connectionInfo, provider);
         connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
index 3986f8b..5514c61 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
@@ -20,6 +20,8 @@ import java.net.URI;
 
 import javax.jms.JMSContext;
 
+import org.apache.qpid.jms.meta.JmsConnectionId;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.mock.MockProviderFactory;
@@ -27,6 +29,7 @@ import org.apache.qpid.jms.provider.mock.MockProviderListener;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.util.IdGenerator;
 import org.junit.After;
+import org.junit.Before;
 
 /**
  * Base for tests that require a JmsConnection that is created using a
@@ -42,6 +45,7 @@ public class JmsConnectionTestSupport extends QpidJmsTestCase {
     protected JmsTopicConnection topicConnection;
     protected JmsQueueConnection queueConnection;
     protected ProviderListener providerListener;
+    protected JmsConnectionInfo connectionInfo;
 
     private Provider createMockProvider() throws Exception {
         return mockFactory.createProvider(new URI("mock://localhost")).setEventListener(new MockProviderListener() {
@@ -54,22 +58,30 @@ public class JmsConnectionTestSupport extends QpidJmsTestCase {
     }
 
     protected JmsContext createJMSContextToMockProvider() throws Exception {
-        JmsConnection connection = new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+        JmsConnection connection = new JmsConnection(connectionInfo, createMockProvider());
         JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
 
         return context;
     }
 
     protected JmsConnection createConnectionToMockProvider() throws Exception {
-        return new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+        return new JmsConnection(connectionInfo, createMockProvider());
     }
 
     protected JmsQueueConnection createQueueConnectionToMockProvider() throws Exception {
-        return new JmsQueueConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+        return new JmsQueueConnection(connectionInfo, createMockProvider());
     }
 
     protected JmsTopicConnection createTopicConnectionToMockProvider() throws Exception {
-        return new JmsTopicConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+        return new JmsTopicConnection(connectionInfo, createMockProvider());
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        connectionInfo = new JmsConnectionInfo(new JmsConnectionId("ID:TEST:1"));
+        connectionInfo.setClientId(clientIdGenerator.generateId(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
index b247015..dc11976 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java
@@ -60,10 +60,18 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCreateConnectionGoodProviderURI() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory(new URI("amqp://127.0.0.1:" + testPeer.getServerPort()));
             Connection connection = factory.createConnection();
             assertNotNull(connection);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -71,10 +79,18 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCreateConnectionGoodProviderString() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort());
             Connection connection = factory.createConnection();
             assertNotNull(connection);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -82,10 +98,18 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testTopicCreateConnectionGoodProviderString() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort());
             TopicConnection connection = factory.createTopicConnection();
             assertNotNull(connection);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -93,10 +117,18 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testCreateQueueConnectionGoodProviderString() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort());
             QueueConnection connection = factory.createQueueConnection();
             assertNotNull(connection);
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -104,7 +136,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
     @Test(timeout=20000)
     public void testUriOptionsAppliedToConnection() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.localMessagePriority=true&jms.forceAsyncSend=true";
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
             assertTrue(factory.isLocalMessagePriority());
@@ -114,6 +151,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
             assertNotNull(connection);
             assertTrue(connection.isLocalMessagePriority());
             assertTrue(connection.isForceAsyncSend());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -170,20 +210,28 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test(timeout=20000)
+    @Test(timeout = 20000)
     public void testMessageIDFormatOptionApplied() throws Exception {
         BUILTIN[] formatters = JmsMessageIDBuilder.BUILTIN.values();
 
         for (BUILTIN formatter : formatters) {
             LOG.info("Testing application of Message ID Format: {}", formatter.name());
             try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+                // Ignore errors from peer close due to not sending any Open / Close frames
+                testPeer.setSuppressReadExceptionOnClose(true);
+
                 // DONT create a test fixture, we will drive everything directly.
+                testPeer.expectSaslAnonymous();
+
                 String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=" + formatter.name();
                 JmsConnectionFactory factory = new JmsConnectionFactory(uri);
                 assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType());
 
                 JmsConnection connection = (JmsConnection) factory.createConnection();
                 assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
+
+                testPeer.waitForAllHandlersToComplete(1000);
+
                 connection.close();
             }
         }
@@ -194,7 +242,12 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         CustomJmsMessageIdBuilder custom = new CustomJmsMessageIdBuilder();
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             // DONT create a test fixture, we will drive everything directly.
+            testPeer.expectSaslAnonymous();
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
 
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
@@ -203,6 +256,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
 
             JmsConnection connection = (JmsConnection) factory.createConnection();
             assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -212,8 +268,13 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         CustomJmsMessageIDPolicy custom = new CustomJmsMessageIDPolicy();
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
 
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
             factory.setMessageIDPolicy(custom);
             assertEquals(custom, factory.getMessageIDPolicy());
@@ -221,6 +282,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) factory.createConnection();
             assertTrue(connection.getMessageIDPolicy() instanceof CustomJmsMessageIDPolicy);
             assertNotSame(custom, connection.getMessageIDPolicy());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -230,8 +294,13 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         CustomJmsPrefetchPolicy custom = new CustomJmsPrefetchPolicy();
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
 
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
             factory.setPrefetchPolicy(custom);
             assertEquals(custom, factory.getPrefetchPolicy());
@@ -239,6 +308,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) factory.createConnection();
             assertTrue(connection.getPrefetchPolicy() instanceof CustomJmsPrefetchPolicy);
             assertNotSame(custom, connection.getPrefetchPolicy());
+
+            testPeer.waitForAllHandlersToCompleteNoAssert(1000);
+
             connection.close();
         }
     }
@@ -248,8 +320,13 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         CustomJmsPresettlePolicy custom = new CustomJmsPresettlePolicy();
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
 
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
             factory.setPresettlePolicy(custom);
             assertEquals(custom, factory.getPresettlePolicy());
@@ -257,6 +334,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) factory.createConnection();
             assertTrue(connection.getPresettlePolicy() instanceof CustomJmsPresettlePolicy);
             assertNotSame(custom, connection.getPresettlePolicy());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }
@@ -266,8 +346,13 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
         CustomJmsRedeliveryPolicy custom = new CustomJmsRedeliveryPolicy();
 
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Ignore errors from peer close due to not sending any Open / Close frames
+            testPeer.setSuppressReadExceptionOnClose(true);
+
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort();
 
+            testPeer.expectSaslAnonymous();
+
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
             factory.setRedeliveryPolicy(custom);
             assertEquals(custom, factory.getRedeliveryPolicy());
@@ -275,6 +360,9 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase {
             JmsConnection connection = (JmsConnection) factory.createConnection();
             assertTrue(connection.getRedeliveryPolicy() instanceof CustomJmsRedeliveryPolicy);
             assertNotSame(custom, connection.getRedeliveryPolicy());
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
             connection.close();
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 6ff3c53..92f0752 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -292,7 +292,8 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
                     hasEntry(AmqpSupport.VERSION, MetaDataSupport.PROVIDER_VERSION),
                     hasEntry(AmqpSupport.PLATFORM, MetaDataSupport.PLATFORM_DETAILS));
 
-            testPeer.expectSaslAnonymousConnect(null, null, connPropsMatcher, null);
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(connPropsMatcher, null, false);
             testPeer.expectBegin();
 
             ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=foo");
@@ -354,7 +355,8 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
 
     private void doAmqpHostnameTestImpl(String amqpHostname, boolean setHostnameOption, Matcher<?> hostnameMatcher) throws JMSException, InterruptedException, Exception, IOException {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            testPeer.expectSaslAnonymousConnect(null, hostnameMatcher);
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(null, hostnameMatcher, false);
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
index 556ae64..d6fa8d3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
@@ -74,7 +74,8 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
     @Test(timeout = 20000)
     public void testConnectThrowsTimedOutExceptioWhenResponseNotSent() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            testPeer.expectSaslAnonymousConnect(true);
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(true);
             testPeer.expectClose();
             try {
                 establishAnonymousConnecton(testPeer, true, "jms.connectTimeout=500");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
index 04f2ce4..f5ace89 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -52,7 +51,8 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
     @Test(timeout = 20000)
     public void testIdleTimeoutIsAdvertisedByDefault() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
-            testPeer.expectSaslAnonymousConnect(greaterThan(UnsignedInteger.valueOf(0)), null);
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(null, greaterThan(UnsignedInteger.valueOf(0)), null, false);
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -75,7 +75,9 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
             int configuredTimeout = 54320;
             int advertisedValue = configuredTimeout / 2;
 
-            testPeer.expectSaslAnonymousConnect(equalTo(UnsignedInteger.valueOf(advertisedValue)), null);
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen(null, greaterThan(UnsignedInteger.valueOf(advertisedValue)), null, false);
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -101,7 +103,9 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
 
             testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -139,7 +143,9 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
 
             testPeer.setAdvertisedIdleTimeout(advertisedTimeout);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -168,7 +174,9 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             int configuredTimeout = 200;
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 
@@ -204,7 +212,9 @@ public class IdleTimeoutIntegrationTest extends QpidJmsTestCase {
 
             final CountDownLatch latch = new CountDownLatch(cycles);
 
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
+
             // Each connection creates a session for managing temporary destinations etc
             testPeer.expectBegin();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
index afcef74..acb386f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
@@ -61,7 +61,8 @@ public class IntegrationTestFixture {
     Connection establishConnecton(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException {
         Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
 
-        testPeer.expectSaslPlainConnect("guest", "guest", desiredCapabilities, serverCapabilities, serverProperties);
+        testPeer.expectSaslPlain("guest", "guest");
+        testPeer.expectOpen(desiredCapabilities, serverCapabilities, serverProperties);
 
         // Each connection creates a session for managing temporary destinations etc
         testPeer.expectBegin();
@@ -112,7 +113,8 @@ public class IntegrationTestFixture {
     JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId, int sessionMode) throws JMSException {
         Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
 
-        testPeer.expectSaslPlainConnect("guest", "guest", desiredCapabilities, serverCapabilities, serverProperties);
+        testPeer.expectSaslPlain("guest", "guest");
+        testPeer.expectOpen(desiredCapabilities, serverCapabilities, serverProperties);
 
         // Each connection creates a session for managing temporary destinations etc
         testPeer.expectBegin();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/22bbb5da/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 36e6422..1d7f932 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -677,13 +677,13 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID_STRING";
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
 
-            Connection connection = factory.createConnection();
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
-
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
 
+            Connection connection = factory.createConnection();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             String queueName = "myQueue";
             Queue queue = session.createQueue(queueName);
@@ -735,13 +735,13 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID";
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
 
-            Connection connection = factory.createConnection();
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
-
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
 
+            Connection connection = factory.createConnection();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             String queueName = "myQueue";
             Queue queue = session.createQueue(queueName);
@@ -790,13 +790,13 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=PREFIXED_UUID_STRING";
             JmsConnectionFactory factory = new JmsConnectionFactory(uri);
 
-            Connection connection = factory.createConnection();
-            testPeer.expectSaslAnonymousConnect();
+            testPeer.expectSaslAnonymous();
+            testPeer.expectOpen();
             testPeer.expectBegin();
-
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
 
+            Connection connection = factory.createConnection();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             String queueName = "myQueue";
             Queue queue = session.createQueue(queueName);
@@ -1587,7 +1587,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1630,7 +1631,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1672,7 +1674,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1718,7 +1721,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1789,7 +1793,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();
@@ -1835,7 +1840,8 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             String user = "user";
             String pass = "qwerty123456";
 
-            testPeer.expectSaslPlainConnect(user, pass, null, null);
+            testPeer.expectSaslPlain(user, pass);
+            testPeer.expectOpen();
             testPeer.expectBegin();
             testPeer.expectBegin();
             testPeer.expectSenderAttach();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org