You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/24 22:36:36 UTC

[2/3] qpid-jms git commit: QPIDJMS-267 Failover discovery via connection properties

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 63702c9..b26cb5f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -75,54 +75,54 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
     @Test(timeout=20000)
     public void testCreate() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
     }
 
     @Test(timeout=20000, expected=RuntimeException.class)
     public void testGetMessageFactoryTrowsWhenNotConnected() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         provider.getMessageFactory();
     }
 
     @Test(timeout=20000)
     public void testUnInitializedProviderReturnsDefaultConnectTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT, provider.getConnectTimeout());
     }
 
     @Test(timeout=20000)
     public void testUnInitializedProviderReturnsDefaultCloseTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT, provider.getCloseTimeout());
     }
 
     @Test(timeout=20000)
     public void testUnInitializedProviderReturnsDefaultSendTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(JmsConnectionInfo.DEFAULT_SEND_TIMEOUT, provider.getSendTimeout());
     }
 
     @Test(timeout=20000)
     public void testUnInitializedProviderReturnsDefaultRequestTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT, provider.getRequestTimeout());
     }
 
     @Test(timeout=20000)
     public void testGetDefaultDrainTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getDrainTimeout());
     }
 
     @Test(timeout=20000)
     public void testGetDefaultIdleTimeout() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getIdleTimeout());
     }
 
     @Test(timeout=20000)
     public void testEnableTraceFrames() throws Exception {
-        provider = new AmqpProvider(getDefaultURI());
+        provider = new AmqpProviderFactory().createProvider(getDefaultURI());
         TransportImpl transport = (TransportImpl) provider.getProtonTransport();
         assertNotNull(transport);
         assertNull(transport.getProtocolTracer());
@@ -131,19 +131,13 @@ public class AmqpProviderTest extends QpidJmsTestCase {
     }
 
     @Test(timeout=20000)
-    public void testConnectWithUnknownProtocol() throws Exception {
-        try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
-            provider = new AmqpProvider(getPeerURI(testPeer));
-            provider.setTransportType("ftp");
-            try {
-                provider.connect(connectionInfo);
-                fail("Should have failed to connect.");
-            } catch (Exception ex) {
-            }
-
-            provider.close();
-
-            testPeer.waitForAllHandlersToComplete(1000);
+    public void testCreateFailsWithUnknownProtocol() throws Exception {
+        try {
+            AmqpProviderFactory factory = new AmqpProviderFactory();
+            factory.setTransportScheme("ftp");
+            factory.createProvider(new URI("ftp://localhost:5672"));
+            fail("Should have failed to connect.");
+        } catch (Exception ex) {
         }
     }
 
@@ -153,7 +147,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             URI peerURI = getPeerURI(testPeer);
             testPeer.close();
 
-            provider = new AmqpProvider(peerURI);
+            provider = new AmqpProviderFactory().createProvider(peerURI);
             try {
                 provider.connect(connectionInfo);
                 fail("Should have failed to connect.");
@@ -166,7 +160,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
     public void testDisableSaslLayer() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
 
             provider.setSaslLayer(false);
             provider.connect(connectionInfo);
@@ -185,7 +179,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
             testPeer.expectSaslAnonymous();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
 
             TransportImpl transport = (TransportImpl) provider.getProtonTransport();
 
@@ -210,7 +204,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
             testPeer.expectSaslAnonymous();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
 
             TransportImpl transport = (TransportImpl) provider.getProtonTransport();
 
@@ -235,7 +229,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
             testPeer.expectSaslAnonymous();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
 
             TransportImpl transport = (TransportImpl) provider.getProtonTransport();
 
@@ -259,7 +253,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
             testPeer.expectSaslAnonymous();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
             provider.connect(connectionInfo);
 
             assertNull(provider.getProviderListener());
@@ -294,7 +288,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
             testPeer.expectSaslAnonymous();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
             provider.connect(connectionInfo);
             assertTrue(provider.toString().contains("localhost"));
             assertTrue(provider.toString().contains(String.valueOf(testPeer.getServerPort())));
@@ -315,7 +309,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             testPeer.expectOpen();
             testPeer.expectClose();
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
             provider.connect(connectionInfo);
             provider.close();
 
@@ -352,7 +346,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             connectionInfo.setUsername(TEST_USERNAME);
             connectionInfo.setPassword(TEST_PASSWORD);
 
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
             testPeer.expectSaslPlain(TEST_USERNAME, TEST_PASSWORD);
             testPeer.expectOpen();
             testPeer.expectBegin();
@@ -403,7 +397,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
     private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
-            provider = new AmqpProvider(getPeerURI(testPeer));
+            provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
 
             final AtomicBoolean errorThrown = new AtomicBoolean();
             JmsResource resourceInfo = new JmsAbstractResource() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
index 70ebd11..15f29d8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -30,46 +32,55 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
 import org.apache.qpid.proton.amqp.transport.ErrorCondition;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class AmqpSupportTest {
 
     @Test
-    public void testCreateRedirectionException() {
+    public void testCreateRedirectionException() throws URISyntaxException {
         ErrorCondition condition = new ErrorCondition();
 
+        AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+        Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
         Map<Symbol, Object> info = new HashMap<>();
         info.put(AmqpSupport.PORT, "5672");
         info.put(AmqpSupport.OPEN_HOSTNAME, "localhost.localdomain");
         info.put(AmqpSupport.NETWORK_HOST, "localhost");
         info.put(AmqpSupport.SCHEME, "amqp");
-        info.put(AmqpSupport.PATH, "websocket");
+        info.put(AmqpSupport.PATH, "/websocket");
 
         condition.setInfo(info);
 
         Symbol error = AmqpError.INTERNAL_ERROR;
         String message = "Failed to connect";
 
-        Exception result = AmqpSupport.createRedirectException(error, message, condition);
+        Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
 
         assertNotNull(result);
         assertTrue(result instanceof ProviderRedirectedException);
 
         ProviderRedirectedException pre = (ProviderRedirectedException) result;
 
-        assertEquals(5672, pre.getPort());
-        assertEquals("localhost.localdomain", pre.getHostname());
-        assertEquals("localhost", pre.getNetworkHost());
-        assertEquals("amqp", pre.getScheme());
-        assertEquals("websocket", pre.getPath());
+        URI redirection = pre.getRedirectionURI();
+
+        assertEquals(5672, redirection.getPort());
+        assertTrue("localhost.localdomain", redirection.getQuery().contains("amqp.vhost=localhost.localdomain"));
+        assertEquals("localhost", redirection.getHost());
+        assertEquals("amqp", redirection.getScheme());
+        assertEquals("/websocket", redirection.getPath());
     }
 
     @Test
-    public void testCreateRedirectionExceptionWithNoRedirectInfo() {
+    public void testCreateRedirectionExceptionWithNoRedirectInfo() throws URISyntaxException {
+        AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+        Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
         ErrorCondition condition = new ErrorCondition();
         Symbol error = AmqpError.INTERNAL_ERROR;
         String message = "Failed to connect";
 
-        Exception result = AmqpSupport.createRedirectException(error, message, condition);
+        Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
 
         assertNotNull(result);
         assertFalse(result instanceof ProviderRedirectedException);
@@ -77,7 +88,10 @@ public class AmqpSupportTest {
     }
 
     @Test
-    public void testCreateRedirectionExceptionWithNoNetworkHost() {
+    public void testCreateRedirectionExceptionWithNoNetworkHost() throws URISyntaxException {
+        AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+        Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
         ErrorCondition condition = new ErrorCondition();
 
         Map<Symbol, Object> info = new HashMap<>();
@@ -91,7 +105,7 @@ public class AmqpSupportTest {
         Symbol error = AmqpError.INTERNAL_ERROR;
         String message = "Failed to connect";
 
-        Exception result = AmqpSupport.createRedirectException(error, message, condition);
+        Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
 
         assertNotNull(result);
         assertFalse(result instanceof ProviderRedirectedException);
@@ -99,7 +113,10 @@ public class AmqpSupportTest {
     }
 
     @Test
-    public void testCreateRedirectionExceptionWithEmptyNetworkHost() {
+    public void testCreateRedirectionExceptionWithEmptyNetworkHost() throws URISyntaxException {
+        AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+        Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
         ErrorCondition condition = new ErrorCondition();
 
         Map<Symbol, Object> info = new HashMap<>();
@@ -114,7 +131,7 @@ public class AmqpSupportTest {
         Symbol error = AmqpError.INTERNAL_ERROR;
         String message = "Failed to connect";
 
-        Exception result = AmqpSupport.createRedirectException(error, message, condition);
+        Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
 
         assertNotNull(result);
         assertFalse(result instanceof ProviderRedirectedException);
@@ -122,7 +139,10 @@ public class AmqpSupportTest {
     }
 
     @Test
-    public void testCreateRedirectionExceptionWithInvalidPort() {
+    public void testCreateRedirectionExceptionWithInvalidPort() throws URISyntaxException {
+        AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+        Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
         ErrorCondition condition = new ErrorCondition();
 
         Map<Symbol, Object> info = new HashMap<>();
@@ -137,7 +157,7 @@ public class AmqpSupportTest {
         Symbol error = AmqpError.INTERNAL_ERROR;
         String message = "Failed to connect";
 
-        Exception result = AmqpSupport.createRedirectException(error, message, condition);
+        Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
 
         assertNotNull(result);
         assertFalse(result instanceof ProviderRedirectedException);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index e543eec..1120285 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -419,4 +419,31 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         assertEquals(SEND_TIMEOUT, provider.getSendTimeout());
         assertEquals(REQUEST_TIMEOUT, provider.getRequestTimeout());
     }
+
+    @Test(timeout = 30000)
+    public void testAmqpOpenServerListBehaviourDefault() {
+        provider = new FailoverProvider(uris);
+        assertEquals("REPLACE", provider.getAmqpOpenServerListBehaviour());
+    }
+
+    @Test(timeout = 30000)
+    public void testSetGetAmqpOpenServerListBehaviour() {
+        provider = new FailoverProvider(uris);
+        String behaviour = "ADD";
+        assertFalse(behaviour.equals(provider.getAmqpOpenServerListBehaviour()));
+
+        provider.setAmqpOpenServerListBehaviour(behaviour);
+        assertEquals(behaviour, provider.getAmqpOpenServerListBehaviour());
+    }
+
+    @Test(timeout = 30000)
+    public void testSetInvalidAmqpOpenServerListBehaviourThrowsIAE() {
+        provider = new FailoverProvider(uris);
+        try {
+            provider.setAmqpOpenServerListBehaviour("invalid");
+            fail("no exception was thrown");
+        } catch (IllegalArgumentException iae) {
+            // Expected
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
index 5c9bef6..bcf9d2c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
@@ -22,6 +22,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
@@ -29,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
@@ -57,7 +57,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
              TestAmqpPeer redirectedPeer = new TestAmqpPeer();) {
 
             final CountDownLatch connected = new CountDownLatch(1);
-            final String redirectURI = createPeerURI(redirectedPeer);
+            final URI redirectURI = createPeerURI(redirectedPeer);
             LOG.info("Backup peer is at: {}", redirectURI);
 
             redirectedPeer.expectSaslAnonymous();
@@ -76,7 +76,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
                     LOG.info("Connection Established: {}", remoteURI);
-                    if (redirectURI.equals(remoteURI.toString())) {
+                    if (isExpectedHost(redirectURI, remoteURI)) {
                         connected.countDown();
                     }
                 }
@@ -100,8 +100,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
             final CountDownLatch connectedToPrimary = new CountDownLatch(1);
             final CountDownLatch connectedToBackup = new CountDownLatch(1);
 
-            final String rejectingURI = createPeerURI(rejectingPeer);
-            final String redirectURI = createPeerURI(redirectedPeer);
+            final URI rejectingURI = createPeerURI(rejectingPeer);
+            final URI redirectURI = createPeerURI(redirectedPeer);
             LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI);
 
             redirectedPeer.expectSaslAnonymous();
@@ -123,7 +123,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
                     LOG.info("Connection Established: {}", remoteURI);
-                    if (remoteURI.toString().equals(rejectingURI)) {
+                    if (isExpectedHost(rejectingURI, remoteURI)) {
                         connectedToPrimary.countDown();
                     }
                 }
@@ -131,7 +131,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
                 @Override
                 public void onConnectionRestored(URI remoteURI) {
                     LOG.info("Connection Reestablished: {}", remoteURI);
-                    if (remoteURI.toString().equals(redirectURI)) {
+                    if (isExpectedHost(redirectURI, remoteURI)) {
                         connectedToBackup.countDown();
                     }
                 }
@@ -149,8 +149,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
         }
     }
 
-    private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws JMSException {
-        final String remoteURI = "failover:(" + createPeerURI(testPeer) + ")";
+    private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws Exception {
+        final String remoteURI = "failover:(" + createPeerURI(testPeer).toString() + ")";
 
         ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
         Connection connection = factory.createConnection();
@@ -158,7 +158,22 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
         return (JmsConnection) connection;
     }
 
-    private String createPeerURI(TestAmqpPeer peer) {
-        return "amqp://localhost:" + peer.getServerPort();
+    private boolean isExpectedHost(URI expected, URI actual) {
+        if (!expected.getHost().equals(actual.getHost())) {
+            LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost());
+            return false;
+        }
+
+        if (expected.getPort() != actual.getPort()) {
+            LOG.info("Expected host {} on port {} but got host {} on port {}",
+                expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort());
+            return false;
+        }
+
+        return true;
+    }
+
+    private URI createPeerURI(TestAmqpPeer peer) throws URISyntaxException {
+        return new URI("amqp://localhost:" + peer.getServerPort());
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
index 39917cf..b59e1b6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
@@ -29,6 +29,7 @@ import java.net.InetAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -299,6 +300,33 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
     }
 
     @Test
+    public void testAddAllHandlesNulls() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool(uris, null);
+        pool.setRandomize(false);
+        pool.addAll(null);
+
+        assertEquals(uris.size(), pool.size());
+    }
+
+    @Test
+    public void testAddAllHandlesEmpty() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool(uris, null);
+        pool.setRandomize(false);
+        pool.addAll(Collections.emptyList());
+
+        assertEquals(uris.size(), pool.size());
+    }
+
+    @Test
+    public void testAddAll() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool(null, null);
+        pool.setRandomize(false);
+        pool.addAll(uris);
+
+        assertEquals(uris.size(), pool.size());
+    }
+
+    @Test
     public void testRemoveURIFromPool() throws URISyntaxException {
         FailoverUriPool pool = new FailoverUriPool(uris, null);
         pool.setRandomize(false);
@@ -526,4 +554,16 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
 
         return resolutionWorks;
     }
+
+    @Test
+    public void testRemoveAll() throws URISyntaxException {
+        FailoverUriPool pool = new FailoverUriPool(uris, null);
+        assertEquals(uris.size(), pool.size());
+
+        pool.removeAll();
+        assertTrue(pool.isEmpty());
+        assertEquals(0, pool.size());
+
+        pool.removeAll();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
new file mode 100644
index 0000000..bdfa3e5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
@@ -0,0 +1,1142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.net.ssl.SSLContext;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.transports.TransportSslOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverWithAmqpOpenProvidedServerListIntegrationTest.class);
+
+    private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore";
+    private static final String BROKER_JKS_TRUSTSTORE = "src/test/resources/broker-jks.truststore";
+    private static final String PASSWORD = "password";
+    private static final String CLIENT_JKS_KEYSTORE = "src/test/resources/client-jks.keystore";
+    private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+
+    private static final String JAVAX_NET_SSL_KEY_STORE = "javax.net.ssl.keyStore";
+    private static final String JAVAX_NET_SSL_KEY_STORE_PASSWORD = "javax.net.ssl.keyStorePassword";
+    private static final String JAVAX_NET_SSL_TRUST_STORE = "javax.net.ssl.trustStore";
+    private static final String JAVAX_NET_SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword";
+
+    private static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
+    private static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+    private static final Symbol HOSTNAME = Symbol.valueOf("hostname");
+    private static final Symbol PORT =  Symbol.valueOf("port");
+
+    /*
+     * Verify that when the Open frame contains a failover server list, and the client is configured to
+     * replace the servers in its existing URI pool, it does so, leaving the server successfully connected
+     * to plus the announced failover servers.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverHandlesServerProvidedFailoverListReplace() throws Exception {
+        doFailoverHandlesServerProvidedFailoverListTestImpl(true);
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list, and the client is configured to
+     * add the servers to its existing URI pool, it does so.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverHandlesServerProvidedFailoverListAdd() throws Exception {
+        doFailoverHandlesServerProvidedFailoverListTestImpl(false);
+    }
+
+    private void doFailoverHandlesServerProvidedFailoverListTestImpl(boolean replace) throws Exception {
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+             TestAmqpPeer backupPeer1 = new TestAmqpPeer();
+             TestAmqpPeer backupPeer2 = new TestAmqpPeer();) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeer1URI = createPeerURI(backupPeer1);
+            final URI backupPeer2URI = createPeerURI(backupPeer2);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup1 is at: {}", backupPeer1URI);
+            LOG.info("Backup2 is at: {}", backupPeer2URI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup1 = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup2 = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            String failoverParams = null;
+            if (replace) {
+                failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=REPLACE";
+            } else {
+                failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=ADD";
+            }
+
+            // We only give it the primary/dropping peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeer1URI, remoteURI)) {
+                        connectedToBackup1.countDown();
+                    } else if (isExpectedHost(backupPeer2URI, remoteURI)) {
+                        connectedToBackup2.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing the backup1 advertised
+            Map<Symbol,Object> backupPeer1Details = new HashMap<>();
+            backupPeer1Details.put(NETWORK_HOST, "localhost");
+            backupPeer1Details.put(PORT, backupPeer1.getServerPort());
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeer1Details);
+
+            Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
+            server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(server1ConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(backupPeer1URI);
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Set the backup1 to expect a connection, have the failover list containing the backup2 advertised
+            Map<Symbol,Object> backupPeer2Details = new HashMap<>();
+            backupPeer2Details.put(NETWORK_HOST, "localhost");
+            backupPeer2Details.put(PORT, backupPeer2.getServerPort());
+
+            List<Map<Symbol, Object>> backup1FailoverServerList = new ArrayList<Map<Symbol, Object>>();
+            backup1FailoverServerList.add(backupPeer2Details);
+
+            Map<Symbol,Object> backup1serverConnectionProperties = new HashMap<Symbol, Object>();
+            backup1serverConnectionProperties.put(FAILOVER_SERVER_LIST, backup1FailoverServerList);
+
+            backupPeer1.expectSaslAnonymous();
+            backupPeer1.expectOpen(backup1serverConnectionProperties);
+            backupPeer1.expectBegin();
+
+            // Kill the primary peer
+            primaryPeer.close();
+
+            assertTrue("Should connect to backup1 peer", connectedToBackup1.await(5, TimeUnit.SECONDS));
+            assertEquals("Should not yet connect to backup2 peer", 1, connectedToBackup2.getCount());
+
+            // Verify the failover URIs are as expected
+            List<URI> afterFirstReconnectFailoverURIs = new ArrayList<>();
+            if (replace) {
+                // Now containing backup1 and backup2 peers
+                afterFirstReconnectFailoverURIs.add(backupPeer1URI);
+                afterFirstReconnectFailoverURIs.add(backupPeer2URI);
+            } else {
+                // Now containing primary, backup1, and backup2 peers
+                afterFirstReconnectFailoverURIs.add(primaryPeerURI);
+                afterFirstReconnectFailoverURIs.add(backupPeer1URI);
+                afterFirstReconnectFailoverURIs.add(backupPeer2URI);
+            }
+
+            assertFailoverURIList(connection, afterFirstReconnectFailoverURIs);
+
+            // Set the backup2 to expect a connection
+            backupPeer2.expectSaslAnonymous();
+            backupPeer2.expectOpen();
+            backupPeer2.expectBegin();
+
+            // Kill the backup1 peer
+            backupPeer1.close();
+
+            assertTrue("Should connect to backup2 peer", connectedToBackup2.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected
+            List<URI> afterSecondReconnectFailoverURIs = new ArrayList<>();
+            if (replace) {
+                // Still containing backup1 and backup2 peers
+                afterSecondReconnectFailoverURIs.add(backupPeer1URI);
+                afterSecondReconnectFailoverURIs.add(backupPeer2URI);
+            } else {
+                // Still containing primary, backup1, and backup2 peers
+                afterSecondReconnectFailoverURIs.add(primaryPeerURI);
+                afterSecondReconnectFailoverURIs.add(backupPeer1URI);
+                afterSecondReconnectFailoverURIs.add(backupPeer2URI);
+            }
+
+            assertFailoverURIList(connection, afterSecondReconnectFailoverURIs);
+
+            backupPeer2.expectClose();
+            connection.close();
+            backupPeer2.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list, and the client is configured to ignore it,
+     * no change occurs in the failover URIs in use by the client after connecting.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverHandlesServerProvidedFailoverListIgnore() throws Exception {
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer();) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            LOG.info("Peer is at: {}", primaryPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            String failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=IGNORE";
+
+            // We only give it the primary peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> primaryPeerOnlyFailoverURIs = new ArrayList<>();
+            primaryPeerOnlyFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing another server
+            Map<Symbol,Object> otherPeerDetails = new HashMap<>();
+            otherPeerDetails.put(NETWORK_HOST, "testhost");
+            otherPeerDetails.put(PORT, "4567");
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(otherPeerDetails);
+
+            Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the existing failover URIs are as expected, still the initial peer only
+            assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
+
+            primaryPeer.expectClose();
+            connection.close();
+            primaryPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list, and it specifies an AMQP hostname in
+     * a particular servers details, the hostname is used when failover occurs.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverHandlesServerProvidedFailoverListWithHostname() throws Exception {
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+             TestAmqpPeer backupPeer = new TestAmqpPeer();) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup is at: {}", backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            // We only give it the primary/dropping peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeerURI, remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing the backup1 advertised
+            Map<Symbol,Object> backupPeer1Details = new HashMap<>();
+            backupPeer1Details.put(NETWORK_HOST, "localhost");
+            backupPeer1Details.put(PORT, backupPeer.getServerPort());
+            String myAmqpVhost = "myAmqpHostname";
+            backupPeer1Details.put(HOSTNAME, myAmqpVhost);
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeer1Details);
+
+            Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
+            server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(server1ConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer and the backup (with vhost details)
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(new URI(backupPeerURI.toString() + "?amqp.vhost=" + myAmqpVhost));
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen(null, Matchers.equalTo(myAmqpVhost), false);
+            backupPeer.expectBegin();
+
+            primaryPeer.close();
+
+            backupPeer.waitForAllHandlersToComplete(3000);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
+     * system properties the redirect uses those properties to connect to the new host.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverUsingSSLConfiguredBySystemProperties() throws Exception {
+        TransportSslOptions serverSslOptions = new TransportSslOptions();
+        serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+        serverSslOptions.setKeyStorePassword(PASSWORD);
+        serverSslOptions.setTrustStorePassword(PASSWORD);
+        serverSslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+        setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+             TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup is at: {}", backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            // We only give it the primary/dropping peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(null, null, true, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeerURI, remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing the backup advertised
+            Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(backupPeerURI);
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            primaryPeer.close();
+
+            assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+            backupPeer.waitForAllHandlersToComplete(3000);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
+     * URI options on the AMQP URI the redirect uses those properties to connect to the new host.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverUsingSSLConfiguredByTransportOptions() throws Exception {
+        TransportSslOptions sslOptions = new TransportSslOptions();
+        sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        sslOptions.setKeyStorePassword(PASSWORD);
+        sslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+             TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup is at: {}", backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is
+            // created
+            primaryPeer.expectSaslAnonymous();
+
+            String connectionOptions = "transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+                                       "transport.trustStorePassword=" + PASSWORD;
+
+            Map<String, String> expectedUriOptions = new LinkedHashMap<>();
+            expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
+            expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
+
+            // We only give it the primary/dropping peer details. It can only
+            // connect to the backup
+            // peer by identifying the details in the announced
+            // failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(connectionOptions, null, true, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeerURI, remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial
+            // peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover
+            // list containing the backup advertised
+            Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial
+            // peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+            afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Verify the client fails over to the advertised backup, and uses
+            // the correct AMQP hostname when doing so
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            primaryPeer.close();
+
+            assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+            backupPeer.waitForAllHandlersToComplete(3000);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL
+     * configured with the Failover URI with nested options the redirect uses those properties to
+     * connect to the new host.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverUsingSSLConfiguredByNestedTransportOptions() throws Exception {
+        TransportSslOptions sslOptions = new TransportSslOptions();
+        sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        sslOptions.setKeyStorePassword(PASSWORD);
+        sslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+             TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup is at: {}", backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is
+            // created
+            primaryPeer.expectSaslAnonymous();
+
+            String failoverOptions = "?failover.nested.transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+                "failover.nested.transport.trustStorePassword=" + PASSWORD;
+
+            Map<String, String> expectedUriOptions = new LinkedHashMap<>();
+            expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
+            expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
+
+            // We only give it the primary/dropping peer details. It can only
+            // connect to the backup
+            // peer by identifying the details in the announced
+            // failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeerURI, remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial
+            // peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover
+            // list containing the backup advertised
+            Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial
+            // peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+            afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Verify the client fails over to the advertised backup, and uses
+            // the correct AMQP hostname when doing so
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            primaryPeer.close();
+
+            assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+            backupPeer.waitForAllHandlersToComplete(3000);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL
+     * configured with with a custom SSLContext the redirect uses those properties to connect to
+     * the new host.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverUsingSSLConfiguredByCustomSSLContext() throws Exception {
+        TransportSslOptions serverSslOptions = new TransportSslOptions();
+        serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+        serverSslOptions.setKeyStorePassword(PASSWORD);
+        serverSslOptions.setTrustStorePassword(PASSWORD);
+        serverSslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+        TransportSslOptions clientSslOptions = new TransportSslOptions();
+        clientSslOptions.setKeyStoreLocation(CLIENT_JKS_KEYSTORE);
+        clientSslOptions.setTrustStoreLocation(CLIENT_JKS_TRUSTSTORE);
+        clientSslOptions.setKeyStorePassword(PASSWORD);
+        clientSslOptions.setTrustStorePassword(PASSWORD);
+
+        SSLContext clientSslContext = TransportSupport.createSslContext(clientSslOptions);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+             TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            final URI backupPeerURI = createPeerURI(backupPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+            LOG.info("Backup is at: {}", backupPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+            final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is
+            // created
+            primaryPeer.expectSaslAnonymous();
+
+            // We only give it the primary/dropping peer details. It can only
+            // connect to the backup peer by identifying the details in the announced
+            // failover-server-list.
+            final JmsConnectionFactory factory = new JmsConnectionFactory(
+                    "failover:(amqps://localhost:" + primaryPeer.getServerPort() + ")");
+            factory.setSslContext(clientSslContext);
+
+            JmsConnection connection = (JmsConnection) factory.createConnection();
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                    if (isExpectedHost(backupPeerURI, remoteURI)) {
+                        connectedToBackup.countDown();
+                    }
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(primaryPeerURI);
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover
+            // list containing the backup advertised
+            Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial
+            // peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(primaryPeerURI);
+            afterOpenFailoverURIs.add(backupPeerURI);
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            // Verify the client fails over to the advertised backup, and uses
+            // the correct AMQP hostname when doing so
+            backupPeer.expectSaslAnonymous();
+            backupPeer.expectOpen();
+            backupPeer.expectBegin();
+
+            primaryPeer.close();
+
+            assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+            backupPeer.waitForAllHandlersToComplete(3000);
+
+            backupPeer.expectClose();
+            connection.close();
+            backupPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL
+     * that a remote listed in the open frame failover list is ignored when insecure redirects are
+     * prohibited.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverIgnoresInsecureServerWhenNotConfiguredToAllow() throws Exception {
+        doTestFailoverHandlingOfInsecureRedirectAdvertisement(false);
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via SSL
+     * that a remote listed in the open frame failover list is accepted when insecure redirects are
+     * allowed.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverAcceptsInsecureServerWhenConfiguredToAllow() throws Exception {
+        doTestFailoverHandlingOfInsecureRedirectAdvertisement(true);
+    }
+
+    private void doTestFailoverHandlingOfInsecureRedirectAdvertisement(boolean allow) throws Exception {
+
+        TransportSslOptions serverSslOptions = new TransportSslOptions();
+        serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+        serverSslOptions.setKeyStorePassword(PASSWORD);
+        serverSslOptions.setTrustStorePassword(PASSWORD);
+        serverSslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+        setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=" + allow;
+            String connectionOptions = "amqp.allowNonSecureRedirects=" + allow;
+
+            // We only give it the primary/dropping peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing the backup advertised
+            Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, 5673);
+            backupPeerDetails.put(SCHEME, "amqp");
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+            primaryPeer.expectClose();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+            if (allow) {
+                afterOpenFailoverURIs.add(new URI("amqp://localhost:5673?" + connectionOptions));
+            }
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            connection.close();
+
+            primaryPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via
+     * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list
+     * is updated to contain the 'amqpws' redirect.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverAcceptsUpdateUsingTransportSchemeWS() throws Exception {
+        doTestFailoverAcceptsUpdateUsingTransportSchemes("ws", "amqpws");
+    }
+
+    /*
+     * Verify that when the Open frame contains a failover server list and we are connected via
+     * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list
+     * is updated to contain the 'amqpws' redirect.
+     */
+    @Test(timeout = 20000)
+    public void testFailoverAcceptsUpdateUsingTransportSchemeWSS() throws Exception {
+        doTestFailoverAcceptsUpdateUsingTransportSchemes("wss", "amqpwss");
+    }
+
+    private void doTestFailoverAcceptsUpdateUsingTransportSchemes(String transportScheme, String expected) throws Exception {
+
+        TransportSslOptions serverSslOptions = new TransportSslOptions();
+        serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+        serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+        serverSslOptions.setKeyStorePassword(PASSWORD);
+        serverSslOptions.setTrustStorePassword(PASSWORD);
+        serverSslOptions.setVerifyHost(false);
+
+        SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+        setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+        try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
+
+            final URI primaryPeerURI = createPeerURI(primaryPeer);
+            LOG.info("Primary is at: {}", primaryPeerURI);
+
+            final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+            // Expect the authentication as soon as the connection object is created
+            primaryPeer.expectSaslAnonymous();
+
+            // Allow non-secure redirects for this test for simplicity.
+            String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=true";
+            String connectionOptions = "amqp.allowNonSecureRedirects=true";
+
+            // We only give it the primary/dropping peer details. It can only connect to the backup
+            // peer by identifying the details in the announced failover-server-list.
+            final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (isExpectedHost(primaryPeerURI, remoteURI)) {
+                        connectedToPrimary.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Reestablished: {}", remoteURI);
+                }
+            });
+
+            // Verify the existing failover URIs are as expected, the initial peer only
+            List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+            beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+
+            assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+            // Set the primary up to expect the connection, have the failover list containing the backup advertised
+            Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+            backupPeerDetails.put(NETWORK_HOST, "localhost");
+            backupPeerDetails.put(PORT, 5673);
+            backupPeerDetails.put(SCHEME, transportScheme);
+
+            List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+            failoverServerList.add(backupPeerDetails);
+
+            Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+            serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+            primaryPeer.expectOpen(serverConnectionProperties);
+            primaryPeer.expectBegin();
+            primaryPeer.expectClose();
+
+            // Provoke the actual AMQP connection
+            connection.start();
+
+            assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+            // Verify the failover URIs are as expected, now containing initial peer and the backup1
+            List<URI> afterOpenFailoverURIs = new ArrayList<>();
+            afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+            afterOpenFailoverURIs.add(new URI(expected + "://localhost:5673?" + connectionOptions));
+
+            assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+            connection.close();
+
+            primaryPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    private void setSslSystemPropertiesForCurrentTest(String keystore, String keystorePassword, String truststore, String truststorePassword) {
+        setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE, keystore);
+        setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE_PASSWORD, keystorePassword);
+        setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE, truststore);
+        setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE_PASSWORD, truststorePassword);
+    }
+
+    private void assertFailoverURIList(JmsConnection connection, List<URI> expectedURIs) throws Exception {
+        FailoverProvider provider = getFailoverProvider(connection);
+
+        Field urisField = provider.getClass().getDeclaredField("uris");
+        urisField.setAccessible(true);
+        Object urisObj = urisField.get(provider);
+
+        assertNotNull("Expected to get a uri pool instance", urisObj);
+        assertTrue("Unexpected uri pool type: " + urisObj.getClass(), urisObj instanceof FailoverUriPool);
+        FailoverUriPool uriPool = (FailoverUriPool) urisObj;
+
+        List<URI> current = uriPool.getList();
+        assertEquals(expectedURIs, current);
+    }
+
+    private FailoverProvider getFailoverProvider(JmsConnection connection) throws Exception {
+        Field field = connection.getClass().getDeclaredField("provider");
+        field.setAccessible(true);
+        Object providerObj = field.get(connection);
+
+        assertNotNull("Expected to get a provdier instance", providerObj);
+        assertTrue("Unexpected provider type: " + providerObj.getClass(), providerObj instanceof FailoverProvider);
+        FailoverProvider provider = (FailoverProvider) providerObj;
+        return provider;
+    }
+
+    private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws Exception {
+        return establishAnonymousConnecton(null, failoverParams, peers);
+    }
+
+    private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws Exception {
+        return establishAnonymousConnecton(connectionParams, failoverParams, false, peers);
+    }
+
+    private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, boolean ssl, TestAmqpPeer... peers) throws Exception {
+        if (peers.length == 0) {
+            throw new IllegalArgumentException("No test peers were given, at least 1 required");
+        }
+
+        String remoteURI = "failover:(";
+        boolean first = true;
+        for (TestAmqpPeer peer : peers) {
+            if (!first) {
+                remoteURI += ",";
+            }
+            remoteURI += createPeerURI(peer, connectionParams).toString();
+            first = false;
+        }
+
+        if (failoverParams == null) {
+            remoteURI += ")?failover.maxReconnectAttempts=10";
+        } else {
+            remoteURI += ")" + (failoverParams.startsWith("?") ? "" : "?") + failoverParams;
+        }
+
+        ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        Connection connection = factory.createConnection();
+
+        return (JmsConnection) connection;
+    }
+
+    private URI createPeerURI(TestAmqpPeer peer) throws Exception {
+        return createPeerURI(peer, null);
+    }
+
+    private URI createPeerURI(TestAmqpPeer peer, String params) throws Exception {
+        String scheme = peer.isSSL() ? "amqps" : "amqp";
+        URI result = new URI(scheme, "localhost:" + peer.getServerPort(), null, null, null);
+
+        Map<String, String> queryParameters = PropertyUtil.parseQuery(params);
+
+        return URISupport.applyParameters(result, queryParameters);
+    }
+
+    private boolean isExpectedHost(URI expected, URI actual) {
+        if (!expected.getHost().equals(actual.getHost())) {
+            LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost());
+            return false;
+        }
+
+        if (expected.getPort() != actual.getPort()) {
+            LOG.info("Expected host {} on port {} but got host {} on port {}",
+                expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort());
+            return false;
+        }
+
+        return true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index f751beb..719fbc7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -19,8 +19,8 @@
 package org.apache.qpid.jms.test.testpeer;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
-import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.arrayContaining;
 import static org.hamcrest.Matchers.equalTo;
@@ -227,6 +227,11 @@ public class TestAmqpPeer implements AutoCloseable
         return _driverRunnable.getClientSocket();
     }
 
+    public boolean isSSL()
+    {
+        return _driverRunnable.isSSL();
+    }
+
     public int getAdvertisedIdleTimeout()
     {
         return advertisedIdleTimeout;

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 0f01256..eb0b2a0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -258,4 +258,8 @@ class TestAmqpPeerRunner implements Runnable
     public boolean isSendSaslHeaderPreEmptively() {
         return _sendSaslHeaderPreEmptively;
     }
+
+    public boolean isSSL() {
+        return _serverSocket instanceof SSLServerSocket;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
index 718c422..14418ef 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
@@ -217,7 +217,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase {
         }
     }
 
-    @Test(timeout = 60 * 1000)
+    @Test(timeout = 20 * 1000)
     public void testConnectToWSServerWhenRedirectedWithNewPath() throws Exception {
         try (NettySimpleAmqpServer primary = createWSServer(createServerOptions());
              NettySimpleAmqpServer redirect = createWSServer(createServerOptions())) {
@@ -314,7 +314,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase {
     protected URI createFailoverURI(NettyServer server) throws Exception {
         URI serverURI = createConnectionURI(server, null);
 
-        String failoverURI = "failover:(" + serverURI.toString() + ")";
+        String failoverURI = "failover:(" + serverURI.toString() + "?amqp.vhost=localhost)?failover.maxReconnectAttempts=3";
 
         return new URI(failoverURI);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
index ba2674e..3e16c50 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
@@ -272,14 +272,29 @@ public class URISupportTest {
         parameters.put("t.proxyHost", "localhost");
         parameters.put("t.proxyPort", "80");
 
+        uri = URISupport.applyParameters(uri, parameters, "t.");
+        Map<String,String> appliedParameters = URISupport.parseParameters(uri);
+        assertEquals("all params applied with no prefix", 3, appliedParameters.size());
+        verifyParams(appliedParameters);
+    }
+
+    @Test
+    public void testApplyParametersOverwritesOriginalParameters() throws Exception {
+        URI uri = new URI("http://0.0.0.0:61616?proxyHost=host&proxyPort=21&timeout=1000");
+
+        Map<String,String> parameters = new HashMap<String, String>();
+        parameters.put("proxyHost", "localhost");
+        parameters.put("proxyPort", "80");
+
         uri = URISupport.applyParameters(uri, parameters);
         Map<String,String> appliedParameters = URISupport.parseParameters(uri);
         assertEquals("all params applied with no prefix", 3, appliedParameters.size());
+        verifyParams(appliedParameters);
     }
 
     private void verifyParams(Map<String,String> parameters) {
-        assertEquals(parameters.get("proxyHost"), "localhost");
-        assertEquals(parameters.get("proxyPort"), "80");
+        assertEquals("localhost", parameters.get("proxyHost"));
+        assertEquals("80", parameters.get("proxyPort"));
     }
 
     //---- isCompositeURI ----------------------------------------------------//


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org