You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/24 22:36:36 UTC
[2/3] qpid-jms git commit: QPIDJMS-267 Failover discovery via
connection properties
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 63702c9..b26cb5f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -75,54 +75,54 @@ public class AmqpProviderTest extends QpidJmsTestCase {
@Test(timeout=20000)
public void testCreate() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
}
@Test(timeout=20000, expected=RuntimeException.class)
public void testGetMessageFactoryTrowsWhenNotConnected() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
provider.getMessageFactory();
}
@Test(timeout=20000)
public void testUnInitializedProviderReturnsDefaultConnectTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT, provider.getConnectTimeout());
}
@Test(timeout=20000)
public void testUnInitializedProviderReturnsDefaultCloseTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT, provider.getCloseTimeout());
}
@Test(timeout=20000)
public void testUnInitializedProviderReturnsDefaultSendTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(JmsConnectionInfo.DEFAULT_SEND_TIMEOUT, provider.getSendTimeout());
}
@Test(timeout=20000)
public void testUnInitializedProviderReturnsDefaultRequestTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT, provider.getRequestTimeout());
}
@Test(timeout=20000)
public void testGetDefaultDrainTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getDrainTimeout());
}
@Test(timeout=20000)
public void testGetDefaultIdleTimeout() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
assertEquals(TimeUnit.MINUTES.toMillis(1), provider.getIdleTimeout());
}
@Test(timeout=20000)
public void testEnableTraceFrames() throws Exception {
- provider = new AmqpProvider(getDefaultURI());
+ provider = new AmqpProviderFactory().createProvider(getDefaultURI());
TransportImpl transport = (TransportImpl) provider.getProtonTransport();
assertNotNull(transport);
assertNull(transport.getProtocolTracer());
@@ -131,19 +131,13 @@ public class AmqpProviderTest extends QpidJmsTestCase {
}
@Test(timeout=20000)
- public void testConnectWithUnknownProtocol() throws Exception {
- try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
- provider = new AmqpProvider(getPeerURI(testPeer));
- provider.setTransportType("ftp");
- try {
- provider.connect(connectionInfo);
- fail("Should have failed to connect.");
- } catch (Exception ex) {
- }
-
- provider.close();
-
- testPeer.waitForAllHandlersToComplete(1000);
+ public void testCreateFailsWithUnknownProtocol() throws Exception {
+ try {
+ AmqpProviderFactory factory = new AmqpProviderFactory();
+ factory.setTransportScheme("ftp");
+ factory.createProvider(new URI("ftp://localhost:5672"));
+ fail("Should have failed to connect.");
+ } catch (Exception ex) {
}
}
@@ -153,7 +147,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
URI peerURI = getPeerURI(testPeer);
testPeer.close();
- provider = new AmqpProvider(peerURI);
+ provider = new AmqpProviderFactory().createProvider(peerURI);
try {
provider.connect(connectionInfo);
fail("Should have failed to connect.");
@@ -166,7 +160,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
public void testDisableSaslLayer() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
provider.setSaslLayer(false);
provider.connect(connectionInfo);
@@ -185,7 +179,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
testPeer.expectSaslAnonymous();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
TransportImpl transport = (TransportImpl) provider.getProtonTransport();
@@ -210,7 +204,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
testPeer.expectSaslAnonymous();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
TransportImpl transport = (TransportImpl) provider.getProtonTransport();
@@ -235,7 +229,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
testPeer.expectSaslAnonymous();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
TransportImpl transport = (TransportImpl) provider.getProtonTransport();
@@ -259,7 +253,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
testPeer.expectSaslAnonymous();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
provider.connect(connectionInfo);
assertNull(provider.getProviderListener());
@@ -294,7 +288,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
testPeer.expectSaslAnonymous();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
provider.connect(connectionInfo);
assertTrue(provider.toString().contains("localhost"));
assertTrue(provider.toString().contains(String.valueOf(testPeer.getServerPort())));
@@ -315,7 +309,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
testPeer.expectOpen();
testPeer.expectClose();
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
provider.connect(connectionInfo);
provider.close();
@@ -352,7 +346,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
connectionInfo.setUsername(TEST_USERNAME);
connectionInfo.setPassword(TEST_PASSWORD);
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
testPeer.expectSaslPlain(TEST_USERNAME, TEST_PASSWORD);
testPeer.expectOpen();
testPeer.expectBegin();
@@ -403,7 +397,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
- provider = new AmqpProvider(getPeerURI(testPeer));
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
final AtomicBoolean errorThrown = new AtomicBoolean();
JmsResource resourceInfo = new JmsAbstractResource() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
index 70ebd11..15f29d8 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSupportTest.java
@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
@@ -30,46 +32,55 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.junit.Test;
+import org.mockito.Mockito;
public class AmqpSupportTest {
@Test
- public void testCreateRedirectionException() {
+ public void testCreateRedirectionException() throws URISyntaxException {
ErrorCondition condition = new ErrorCondition();
+ AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+ Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
Map<Symbol, Object> info = new HashMap<>();
info.put(AmqpSupport.PORT, "5672");
info.put(AmqpSupport.OPEN_HOSTNAME, "localhost.localdomain");
info.put(AmqpSupport.NETWORK_HOST, "localhost");
info.put(AmqpSupport.SCHEME, "amqp");
- info.put(AmqpSupport.PATH, "websocket");
+ info.put(AmqpSupport.PATH, "/websocket");
condition.setInfo(info);
Symbol error = AmqpError.INTERNAL_ERROR;
String message = "Failed to connect";
- Exception result = AmqpSupport.createRedirectException(error, message, condition);
+ Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
assertNotNull(result);
assertTrue(result instanceof ProviderRedirectedException);
ProviderRedirectedException pre = (ProviderRedirectedException) result;
- assertEquals(5672, pre.getPort());
- assertEquals("localhost.localdomain", pre.getHostname());
- assertEquals("localhost", pre.getNetworkHost());
- assertEquals("amqp", pre.getScheme());
- assertEquals("websocket", pre.getPath());
+ URI redirection = pre.getRedirectionURI();
+
+ assertEquals(5672, redirection.getPort());
+ assertTrue("localhost.localdomain", redirection.getQuery().contains("amqp.vhost=localhost.localdomain"));
+ assertEquals("localhost", redirection.getHost());
+ assertEquals("amqp", redirection.getScheme());
+ assertEquals("/websocket", redirection.getPath());
}
@Test
- public void testCreateRedirectionExceptionWithNoRedirectInfo() {
+ public void testCreateRedirectionExceptionWithNoRedirectInfo() throws URISyntaxException {
+ AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+ Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
ErrorCondition condition = new ErrorCondition();
Symbol error = AmqpError.INTERNAL_ERROR;
String message = "Failed to connect";
- Exception result = AmqpSupport.createRedirectException(error, message, condition);
+ Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
assertNotNull(result);
assertFalse(result instanceof ProviderRedirectedException);
@@ -77,7 +88,10 @@ public class AmqpSupportTest {
}
@Test
- public void testCreateRedirectionExceptionWithNoNetworkHost() {
+ public void testCreateRedirectionExceptionWithNoNetworkHost() throws URISyntaxException {
+ AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+ Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
ErrorCondition condition = new ErrorCondition();
Map<Symbol, Object> info = new HashMap<>();
@@ -91,7 +105,7 @@ public class AmqpSupportTest {
Symbol error = AmqpError.INTERNAL_ERROR;
String message = "Failed to connect";
- Exception result = AmqpSupport.createRedirectException(error, message, condition);
+ Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
assertNotNull(result);
assertFalse(result instanceof ProviderRedirectedException);
@@ -99,7 +113,10 @@ public class AmqpSupportTest {
}
@Test
- public void testCreateRedirectionExceptionWithEmptyNetworkHost() {
+ public void testCreateRedirectionExceptionWithEmptyNetworkHost() throws URISyntaxException {
+ AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+ Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
ErrorCondition condition = new ErrorCondition();
Map<Symbol, Object> info = new HashMap<>();
@@ -114,7 +131,7 @@ public class AmqpSupportTest {
Symbol error = AmqpError.INTERNAL_ERROR;
String message = "Failed to connect";
- Exception result = AmqpSupport.createRedirectException(error, message, condition);
+ Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
assertNotNull(result);
assertFalse(result instanceof ProviderRedirectedException);
@@ -122,7 +139,10 @@ public class AmqpSupportTest {
}
@Test
- public void testCreateRedirectionExceptionWithInvalidPort() {
+ public void testCreateRedirectionExceptionWithInvalidPort() throws URISyntaxException {
+ AmqpProvider mockProvider = Mockito.mock(AmqpProvider.class);
+ Mockito.when(mockProvider.getRemoteURI()).thenReturn(new URI("amqp://localhost:5672"));
+
ErrorCondition condition = new ErrorCondition();
Map<Symbol, Object> info = new HashMap<>();
@@ -137,7 +157,7 @@ public class AmqpSupportTest {
Symbol error = AmqpError.INTERNAL_ERROR;
String message = "Failed to connect";
- Exception result = AmqpSupport.createRedirectException(error, message, condition);
+ Exception result = AmqpSupport.createRedirectException(mockProvider, error, message, condition);
assertNotNull(result);
assertFalse(result instanceof ProviderRedirectedException);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index e543eec..1120285 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -419,4 +419,31 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
assertEquals(SEND_TIMEOUT, provider.getSendTimeout());
assertEquals(REQUEST_TIMEOUT, provider.getRequestTimeout());
}
+
+ @Test(timeout = 30000)
+ public void testAmqpOpenServerListBehaviourDefault() {
+ provider = new FailoverProvider(uris);
+ assertEquals("REPLACE", provider.getAmqpOpenServerListBehaviour());
+ }
+
+ @Test(timeout = 30000)
+ public void testSetGetAmqpOpenServerListBehaviour() {
+ provider = new FailoverProvider(uris);
+ String behaviour = "ADD";
+ assertFalse(behaviour.equals(provider.getAmqpOpenServerListBehaviour()));
+
+ provider.setAmqpOpenServerListBehaviour(behaviour);
+ assertEquals(behaviour, provider.getAmqpOpenServerListBehaviour());
+ }
+
+ @Test(timeout = 30000)
+ public void testSetInvalidAmqpOpenServerListBehaviourThrowsIAE() {
+ provider = new FailoverProvider(uris);
+ try {
+ provider.setAmqpOpenServerListBehaviour("invalid");
+ fail("no exception was thrown");
+ } catch (IllegalArgumentException iae) {
+ // Expected
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
index 5c9bef6..bcf9d2c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverRedirectTest.java
@@ -22,6 +22,7 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
import static org.junit.Assert.assertTrue;
import java.net.URI;
+import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -29,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
@@ -57,7 +57,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
TestAmqpPeer redirectedPeer = new TestAmqpPeer();) {
final CountDownLatch connected = new CountDownLatch(1);
- final String redirectURI = createPeerURI(redirectedPeer);
+ final URI redirectURI = createPeerURI(redirectedPeer);
LOG.info("Backup peer is at: {}", redirectURI);
redirectedPeer.expectSaslAnonymous();
@@ -76,7 +76,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
- if (redirectURI.equals(remoteURI.toString())) {
+ if (isExpectedHost(redirectURI, remoteURI)) {
connected.countDown();
}
}
@@ -100,8 +100,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
final CountDownLatch connectedToPrimary = new CountDownLatch(1);
final CountDownLatch connectedToBackup = new CountDownLatch(1);
- final String rejectingURI = createPeerURI(rejectingPeer);
- final String redirectURI = createPeerURI(redirectedPeer);
+ final URI rejectingURI = createPeerURI(rejectingPeer);
+ final URI redirectURI = createPeerURI(redirectedPeer);
LOG.info("Primary is at {}: Backup peer is at: {}", rejectingURI, redirectURI);
redirectedPeer.expectSaslAnonymous();
@@ -123,7 +123,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
- if (remoteURI.toString().equals(rejectingURI)) {
+ if (isExpectedHost(rejectingURI, remoteURI)) {
connectedToPrimary.countDown();
}
}
@@ -131,7 +131,7 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
@Override
public void onConnectionRestored(URI remoteURI) {
LOG.info("Connection Reestablished: {}", remoteURI);
- if (remoteURI.toString().equals(redirectURI)) {
+ if (isExpectedHost(redirectURI, remoteURI)) {
connectedToBackup.countDown();
}
}
@@ -149,8 +149,8 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
}
}
- private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws JMSException {
- final String remoteURI = "failover:(" + createPeerURI(testPeer) + ")";
+ private JmsConnection establishAnonymousConnecton(TestAmqpPeer testPeer) throws Exception {
+ final String remoteURI = "failover:(" + createPeerURI(testPeer).toString() + ")";
ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
Connection connection = factory.createConnection();
@@ -158,7 +158,22 @@ public class FailoverRedirectTest extends QpidJmsTestCase {
return (JmsConnection) connection;
}
- private String createPeerURI(TestAmqpPeer peer) {
- return "amqp://localhost:" + peer.getServerPort();
+ private boolean isExpectedHost(URI expected, URI actual) {
+ if (!expected.getHost().equals(actual.getHost())) {
+ LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost());
+ return false;
+ }
+
+ if (expected.getPort() != actual.getPort()) {
+ LOG.info("Expected host {} on port {} but got host {} on port {}",
+ expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort());
+ return false;
+ }
+
+ return true;
+ }
+
+ private URI createPeerURI(TestAmqpPeer peer) throws URISyntaxException {
+ return new URI("amqp://localhost:" + peer.getServerPort());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
index 39917cf..b59e1b6 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverUriPoolTest.java
@@ -29,6 +29,7 @@ import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -299,6 +300,33 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
}
@Test
+ public void testAddAllHandlesNulls() throws URISyntaxException {
+ FailoverUriPool pool = new FailoverUriPool(uris, null);
+ pool.setRandomize(false);
+ pool.addAll(null);
+
+ assertEquals(uris.size(), pool.size());
+ }
+
+ @Test
+ public void testAddAllHandlesEmpty() throws URISyntaxException {
+ FailoverUriPool pool = new FailoverUriPool(uris, null);
+ pool.setRandomize(false);
+ pool.addAll(Collections.emptyList());
+
+ assertEquals(uris.size(), pool.size());
+ }
+
+ @Test
+ public void testAddAll() throws URISyntaxException {
+ FailoverUriPool pool = new FailoverUriPool(null, null);
+ pool.setRandomize(false);
+ pool.addAll(uris);
+
+ assertEquals(uris.size(), pool.size());
+ }
+
+ @Test
public void testRemoveURIFromPool() throws URISyntaxException {
FailoverUriPool pool = new FailoverUriPool(uris, null);
pool.setRandomize(false);
@@ -526,4 +554,16 @@ public class FailoverUriPoolTest extends QpidJmsTestCase {
return resolutionWorks;
}
+
+ @Test
+ public void testRemoveAll() throws URISyntaxException {
+ FailoverUriPool pool = new FailoverUriPool(uris, null);
+ assertEquals(uris.size(), pool.size());
+
+ pool.removeAll();
+ assertTrue(pool.isEmpty());
+ assertEquals(0, pool.size());
+
+ pool.removeAll();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
new file mode 100644
index 0000000..bdfa3e5
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverWithAmqpOpenProvidedServerListIntegrationTest.java
@@ -0,0 +1,1142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.net.ssl.SSLContext;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.transports.TransportSslOptions;
+import org.apache.qpid.jms.transports.TransportSupport;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverWithAmqpOpenProvidedServerListIntegrationTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverWithAmqpOpenProvidedServerListIntegrationTest.class);
+
+ private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore";
+ private static final String BROKER_JKS_TRUSTSTORE = "src/test/resources/broker-jks.truststore";
+ private static final String PASSWORD = "password";
+ private static final String CLIENT_JKS_KEYSTORE = "src/test/resources/client-jks.keystore";
+ private static final String CLIENT_JKS_TRUSTSTORE = "src/test/resources/client-jks.truststore";
+
+ private static final String JAVAX_NET_SSL_KEY_STORE = "javax.net.ssl.keyStore";
+ private static final String JAVAX_NET_SSL_KEY_STORE_PASSWORD = "javax.net.ssl.keyStorePassword";
+ private static final String JAVAX_NET_SSL_TRUST_STORE = "javax.net.ssl.trustStore";
+ private static final String JAVAX_NET_SSL_TRUST_STORE_PASSWORD = "javax.net.ssl.trustStorePassword";
+
+ private static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
+ private static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
+ private static final Symbol HOSTNAME = Symbol.valueOf("hostname");
+ private static final Symbol PORT = Symbol.valueOf("port");
+
+ /*
+ * Verify that when the Open frame contains a failover server list, and the client is configured to
+ * replace the servers in its existing URI pool, it does so, leaving the server successfully connected
+ * to plus the announced failover servers.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverHandlesServerProvidedFailoverListReplace() throws Exception {
+ doFailoverHandlesServerProvidedFailoverListTestImpl(true);
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list, and the client is configured to
+ * add the servers to its existing URI pool, it does so.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverHandlesServerProvidedFailoverListAdd() throws Exception {
+ doFailoverHandlesServerProvidedFailoverListTestImpl(false);
+ }
+
+ private void doFailoverHandlesServerProvidedFailoverListTestImpl(boolean replace) throws Exception {
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+ TestAmqpPeer backupPeer1 = new TestAmqpPeer();
+ TestAmqpPeer backupPeer2 = new TestAmqpPeer();) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeer1URI = createPeerURI(backupPeer1);
+ final URI backupPeer2URI = createPeerURI(backupPeer2);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup1 is at: {}", backupPeer1URI);
+ LOG.info("Backup2 is at: {}", backupPeer2URI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup1 = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup2 = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ String failoverParams = null;
+ if (replace) {
+ failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=REPLACE";
+ } else {
+ failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=ADD";
+ }
+
+ // We only give it the primary/dropping peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeer1URI, remoteURI)) {
+ connectedToBackup1.countDown();
+ } else if (isExpectedHost(backupPeer2URI, remoteURI)) {
+ connectedToBackup2.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing the backup1 advertised
+ Map<Symbol,Object> backupPeer1Details = new HashMap<>();
+ backupPeer1Details.put(NETWORK_HOST, "localhost");
+ backupPeer1Details.put(PORT, backupPeer1.getServerPort());
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeer1Details);
+
+ Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
+ server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(server1ConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(primaryPeerURI);
+ afterOpenFailoverURIs.add(backupPeer1URI);
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Set the backup1 to expect a connection, have the failover list containing the backup2 advertised
+ Map<Symbol,Object> backupPeer2Details = new HashMap<>();
+ backupPeer2Details.put(NETWORK_HOST, "localhost");
+ backupPeer2Details.put(PORT, backupPeer2.getServerPort());
+
+ List<Map<Symbol, Object>> backup1FailoverServerList = new ArrayList<Map<Symbol, Object>>();
+ backup1FailoverServerList.add(backupPeer2Details);
+
+ Map<Symbol,Object> backup1serverConnectionProperties = new HashMap<Symbol, Object>();
+ backup1serverConnectionProperties.put(FAILOVER_SERVER_LIST, backup1FailoverServerList);
+
+ backupPeer1.expectSaslAnonymous();
+ backupPeer1.expectOpen(backup1serverConnectionProperties);
+ backupPeer1.expectBegin();
+
+ // Kill the primary peer
+ primaryPeer.close();
+
+ assertTrue("Should connect to backup1 peer", connectedToBackup1.await(5, TimeUnit.SECONDS));
+ assertEquals("Should not yet connect to backup2 peer", 1, connectedToBackup2.getCount());
+
+ // Verify the failover URIs are as expected
+ List<URI> afterFirstReconnectFailoverURIs = new ArrayList<>();
+ if (replace) {
+ // Now containing backup1 and backup2 peers
+ afterFirstReconnectFailoverURIs.add(backupPeer1URI);
+ afterFirstReconnectFailoverURIs.add(backupPeer2URI);
+ } else {
+ // Now containing primary, backup1, and backup2 peers
+ afterFirstReconnectFailoverURIs.add(primaryPeerURI);
+ afterFirstReconnectFailoverURIs.add(backupPeer1URI);
+ afterFirstReconnectFailoverURIs.add(backupPeer2URI);
+ }
+
+ assertFailoverURIList(connection, afterFirstReconnectFailoverURIs);
+
+ // Set the backup2 to expect a connection
+ backupPeer2.expectSaslAnonymous();
+ backupPeer2.expectOpen();
+ backupPeer2.expectBegin();
+
+ // Kill the backup1 peer
+ backupPeer1.close();
+
+ assertTrue("Should connect to backup2 peer", connectedToBackup2.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected
+ List<URI> afterSecondReconnectFailoverURIs = new ArrayList<>();
+ if (replace) {
+ // Still containing backup1 and backup2 peers
+ afterSecondReconnectFailoverURIs.add(backupPeer1URI);
+ afterSecondReconnectFailoverURIs.add(backupPeer2URI);
+ } else {
+ // Still containing primary, backup1, and backup2 peers
+ afterSecondReconnectFailoverURIs.add(primaryPeerURI);
+ afterSecondReconnectFailoverURIs.add(backupPeer1URI);
+ afterSecondReconnectFailoverURIs.add(backupPeer2URI);
+ }
+
+ assertFailoverURIList(connection, afterSecondReconnectFailoverURIs);
+
+ backupPeer2.expectClose();
+ connection.close();
+ backupPeer2.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list, and the client is configured to ignore it,
+ * no change occurs in the failover URIs in use by the client after connecting.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverHandlesServerProvidedFailoverListIgnore() throws Exception {
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer();) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ LOG.info("Peer is at: {}", primaryPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ String failoverParams = "?failover.maxReconnectAttempts=10&failover.amqpOpenServerListBehaviour=IGNORE";
+
+ // We only give it the primary peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(failoverParams, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> primaryPeerOnlyFailoverURIs = new ArrayList<>();
+ primaryPeerOnlyFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing another server
+ Map<Symbol,Object> otherPeerDetails = new HashMap<>();
+ otherPeerDetails.put(NETWORK_HOST, "testhost");
+ otherPeerDetails.put(PORT, "4567");
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(otherPeerDetails);
+
+ Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the existing failover URIs are as expected, still the initial peer only
+ assertFailoverURIList(connection, primaryPeerOnlyFailoverURIs);
+
+ primaryPeer.expectClose();
+ connection.close();
+ primaryPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list, and it specifies an AMQP hostname in
+ * a particular servers details, the hostname is used when failover occurs.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverHandlesServerProvidedFailoverListWithHostname() throws Exception {
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer();
+ TestAmqpPeer backupPeer = new TestAmqpPeer();) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup is at: {}", backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ // We only give it the primary/dropping peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(null, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeerURI, remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing the backup1 advertised
+ Map<Symbol,Object> backupPeer1Details = new HashMap<>();
+ backupPeer1Details.put(NETWORK_HOST, "localhost");
+ backupPeer1Details.put(PORT, backupPeer.getServerPort());
+ String myAmqpVhost = "myAmqpHostname";
+ backupPeer1Details.put(HOSTNAME, myAmqpVhost);
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeer1Details);
+
+ Map<Symbol,Object> server1ConnectionProperties = new HashMap<Symbol, Object>();
+ server1ConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(server1ConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial peer and the backup (with vhost details)
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(primaryPeerURI);
+ afterOpenFailoverURIs.add(new URI(backupPeerURI.toString() + "?amqp.vhost=" + myAmqpVhost));
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen(null, Matchers.equalTo(myAmqpVhost), false);
+ backupPeer.expectBegin();
+
+ primaryPeer.close();
+
+ backupPeer.waitForAllHandlersToComplete(3000);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
+ * system properties the redirect uses those properties to connect to the new host.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverUsingSSLConfiguredBySystemProperties() throws Exception {
+ TransportSslOptions serverSslOptions = new TransportSslOptions();
+ serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+ serverSslOptions.setKeyStorePassword(PASSWORD);
+ serverSslOptions.setTrustStorePassword(PASSWORD);
+ serverSslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+ setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+ TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup is at: {}", backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ // We only give it the primary/dropping peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(null, null, true, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeerURI, remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing the backup advertised
+ Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(primaryPeerURI);
+ afterOpenFailoverURIs.add(backupPeerURI);
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Verify the client fails over to the advertised backup, and uses the correct AMQP hostname when doing so
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen();
+ backupPeer.expectBegin();
+
+ primaryPeer.close();
+
+ assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+ backupPeer.waitForAllHandlersToComplete(3000);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL configured with
+ * URI options on the AMQP URI the redirect uses those properties to connect to the new host.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverUsingSSLConfiguredByTransportOptions() throws Exception {
+ TransportSslOptions sslOptions = new TransportSslOptions();
+ sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ sslOptions.setKeyStorePassword(PASSWORD);
+ sslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+ TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup is at: {}", backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is
+ // created
+ primaryPeer.expectSaslAnonymous();
+
+ String connectionOptions = "transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+ "transport.trustStorePassword=" + PASSWORD;
+
+ Map<String, String> expectedUriOptions = new LinkedHashMap<>();
+ expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
+ expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
+
+ // We only give it the primary/dropping peer details. It can only
+ // connect to the backup
+ // peer by identifying the details in the announced
+ // failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(connectionOptions, null, true, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeerURI, remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial
+ // peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover
+ // list containing the backup advertised
+ Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial
+ // peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+ afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Verify the client fails over to the advertised backup, and uses
+ // the correct AMQP hostname when doing so
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen();
+ backupPeer.expectBegin();
+
+ primaryPeer.close();
+
+ assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+ backupPeer.waitForAllHandlersToComplete(3000);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL
+ * configured with the Failover URI with nested options the redirect uses those properties to
+ * connect to the new host.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverUsingSSLConfiguredByNestedTransportOptions() throws Exception {
+ TransportSslOptions sslOptions = new TransportSslOptions();
+ sslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ sslOptions.setKeyStorePassword(PASSWORD);
+ sslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(sslOptions);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+ TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup is at: {}", backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is
+ // created
+ primaryPeer.expectSaslAnonymous();
+
+ String failoverOptions = "?failover.nested.transport.trustStoreLocation=" + CLIENT_JKS_TRUSTSTORE + "&" +
+ "failover.nested.transport.trustStorePassword=" + PASSWORD;
+
+ Map<String, String> expectedUriOptions = new LinkedHashMap<>();
+ expectedUriOptions.put("transport.trustStoreLocation", CLIENT_JKS_TRUSTSTORE);
+ expectedUriOptions.put("transport.trustStorePassword", PASSWORD);
+
+ // We only give it the primary/dropping peer details. It can only
+ // connect to the backup
+ // peer by identifying the details in the announced
+ // failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeerURI, remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial
+ // peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover
+ // list containing the backup advertised
+ Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial
+ // peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(URISupport.applyParameters(primaryPeerURI, expectedUriOptions));
+ afterOpenFailoverURIs.add(URISupport.applyParameters(backupPeerURI, expectedUriOptions));
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Verify the client fails over to the advertised backup, and uses
+ // the correct AMQP hostname when doing so
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen();
+ backupPeer.expectBegin();
+
+ primaryPeer.close();
+
+ assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+ backupPeer.waitForAllHandlersToComplete(3000);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL
+ * configured with with a custom SSLContext the redirect uses those properties to connect to
+ * the new host.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverUsingSSLConfiguredByCustomSSLContext() throws Exception {
+ TransportSslOptions serverSslOptions = new TransportSslOptions();
+ serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+ serverSslOptions.setKeyStorePassword(PASSWORD);
+ serverSslOptions.setTrustStorePassword(PASSWORD);
+ serverSslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+ TransportSslOptions clientSslOptions = new TransportSslOptions();
+ clientSslOptions.setKeyStoreLocation(CLIENT_JKS_KEYSTORE);
+ clientSslOptions.setTrustStoreLocation(CLIENT_JKS_TRUSTSTORE);
+ clientSslOptions.setKeyStorePassword(PASSWORD);
+ clientSslOptions.setTrustStorePassword(PASSWORD);
+
+ SSLContext clientSslContext = TransportSupport.createSslContext(clientSslOptions);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false);
+ TestAmqpPeer backupPeer = new TestAmqpPeer(serverSslContext, false);) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ final URI backupPeerURI = createPeerURI(backupPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+ LOG.info("Backup is at: {}", backupPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+ final CountDownLatch connectedToBackup = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is
+ // created
+ primaryPeer.expectSaslAnonymous();
+
+ // We only give it the primary/dropping peer details. It can only
+ // connect to the backup peer by identifying the details in the announced
+ // failover-server-list.
+ final JmsConnectionFactory factory = new JmsConnectionFactory(
+ "failover:(amqps://localhost:" + primaryPeer.getServerPort() + ")");
+ factory.setSslContext(clientSslContext);
+
+ JmsConnection connection = (JmsConnection) factory.createConnection();
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ if (isExpectedHost(backupPeerURI, remoteURI)) {
+ connectedToBackup.countDown();
+ }
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(primaryPeerURI);
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover
+ // list containing the backup advertised
+ Map<Symbol, Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, backupPeer.getServerPort());
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol, Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial
+ // peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(primaryPeerURI);
+ afterOpenFailoverURIs.add(backupPeerURI);
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ // Verify the client fails over to the advertised backup, and uses
+ // the correct AMQP hostname when doing so
+ backupPeer.expectSaslAnonymous();
+ backupPeer.expectOpen();
+ backupPeer.expectBegin();
+
+ primaryPeer.close();
+
+ assertTrue("Should connect to backup peer", connectedToBackup.await(5, TimeUnit.SECONDS));
+
+ backupPeer.waitForAllHandlersToComplete(3000);
+
+ backupPeer.expectClose();
+ connection.close();
+ backupPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL
+ * that a remote listed in the open frame failover list is ignored when insecure redirects are
+ * prohibited.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverIgnoresInsecureServerWhenNotConfiguredToAllow() throws Exception {
+ doTestFailoverHandlingOfInsecureRedirectAdvertisement(false);
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via SSL
+ * that a remote listed in the open frame failover list is accepted when insecure redirects are
+ * allowed.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverAcceptsInsecureServerWhenConfiguredToAllow() throws Exception {
+ doTestFailoverHandlingOfInsecureRedirectAdvertisement(true);
+ }
+
+ private void doTestFailoverHandlingOfInsecureRedirectAdvertisement(boolean allow) throws Exception {
+
+ TransportSslOptions serverSslOptions = new TransportSslOptions();
+ serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+ serverSslOptions.setKeyStorePassword(PASSWORD);
+ serverSslOptions.setTrustStorePassword(PASSWORD);
+ serverSslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+ setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=" + allow;
+ String connectionOptions = "amqp.allowNonSecureRedirects=" + allow;
+
+ // We only give it the primary/dropping peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing the backup advertised
+ Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, 5673);
+ backupPeerDetails.put(SCHEME, "amqp");
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+ primaryPeer.expectClose();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+ if (allow) {
+ afterOpenFailoverURIs.add(new URI("amqp://localhost:5673?" + connectionOptions));
+ }
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ connection.close();
+
+ primaryPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via
+ * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list
+ * is updated to contain the 'amqpws' redirect.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverAcceptsUpdateUsingTransportSchemeWS() throws Exception {
+ doTestFailoverAcceptsUpdateUsingTransportSchemes("ws", "amqpws");
+ }
+
+ /*
+ * Verify that when the Open frame contains a failover server list and we are connected via
+ * the 'amqp' transport and the redirect contains a 'ws' scheme that failover reconnect list
+ * is updated to contain the 'amqpws' redirect.
+ */
+ @Test(timeout = 20000)
+ public void testFailoverAcceptsUpdateUsingTransportSchemeWSS() throws Exception {
+ doTestFailoverAcceptsUpdateUsingTransportSchemes("wss", "amqpwss");
+ }
+
+ private void doTestFailoverAcceptsUpdateUsingTransportSchemes(String transportScheme, String expected) throws Exception {
+
+ TransportSslOptions serverSslOptions = new TransportSslOptions();
+ serverSslOptions.setKeyStoreLocation(BROKER_JKS_KEYSTORE);
+ serverSslOptions.setTrustStoreLocation(BROKER_JKS_TRUSTSTORE);
+ serverSslOptions.setKeyStorePassword(PASSWORD);
+ serverSslOptions.setTrustStorePassword(PASSWORD);
+ serverSslOptions.setVerifyHost(false);
+
+ SSLContext serverSslContext = TransportSupport.createSslContext(serverSslOptions);
+
+ setSslSystemPropertiesForCurrentTest(CLIENT_JKS_KEYSTORE, PASSWORD, CLIENT_JKS_TRUSTSTORE, PASSWORD);
+
+ try (TestAmqpPeer primaryPeer = new TestAmqpPeer(serverSslContext, false)) {
+
+ final URI primaryPeerURI = createPeerURI(primaryPeer);
+ LOG.info("Primary is at: {}", primaryPeerURI);
+
+ final CountDownLatch connectedToPrimary = new CountDownLatch(1);
+
+ // Expect the authentication as soon as the connection object is created
+ primaryPeer.expectSaslAnonymous();
+
+ // Allow non-secure redirects for this test for simplicity.
+ String failoverOptions = "failover.nested.amqp.allowNonSecureRedirects=true";
+ String connectionOptions = "amqp.allowNonSecureRedirects=true";
+
+ // We only give it the primary/dropping peer details. It can only connect to the backup
+ // peer by identifying the details in the announced failover-server-list.
+ final JmsConnection connection = establishAnonymousConnecton(null, failoverOptions, true, primaryPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (isExpectedHost(primaryPeerURI, remoteURI)) {
+ connectedToPrimary.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Reestablished: {}", remoteURI);
+ }
+ });
+
+ // Verify the existing failover URIs are as expected, the initial peer only
+ List<URI> beforeOpenFailoverURIs = new ArrayList<>();
+ beforeOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+
+ assertFailoverURIList(connection, beforeOpenFailoverURIs);
+
+ // Set the primary up to expect the connection, have the failover list containing the backup advertised
+ Map<Symbol,Object> backupPeerDetails = new HashMap<>();
+ backupPeerDetails.put(NETWORK_HOST, "localhost");
+ backupPeerDetails.put(PORT, 5673);
+ backupPeerDetails.put(SCHEME, transportScheme);
+
+ List<Map<Symbol, Object>> failoverServerList = new ArrayList<Map<Symbol, Object>>();
+ failoverServerList.add(backupPeerDetails);
+
+ Map<Symbol,Object> serverConnectionProperties = new HashMap<Symbol, Object>();
+ serverConnectionProperties.put(FAILOVER_SERVER_LIST, failoverServerList);
+
+ primaryPeer.expectOpen(serverConnectionProperties);
+ primaryPeer.expectBegin();
+ primaryPeer.expectClose();
+
+ // Provoke the actual AMQP connection
+ connection.start();
+
+ assertTrue("Should connect to primary peer", connectedToPrimary.await(5, TimeUnit.SECONDS));
+
+ // Verify the failover URIs are as expected, now containing initial peer and the backup1
+ List<URI> afterOpenFailoverURIs = new ArrayList<>();
+ afterOpenFailoverURIs.add(new URI(primaryPeerURI + "?" + connectionOptions));
+ afterOpenFailoverURIs.add(new URI(expected + "://localhost:5673?" + connectionOptions));
+
+ assertFailoverURIList(connection, afterOpenFailoverURIs);
+
+ connection.close();
+
+ primaryPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private void setSslSystemPropertiesForCurrentTest(String keystore, String keystorePassword, String truststore, String truststorePassword) {
+ setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE, keystore);
+ setTestSystemProperty(JAVAX_NET_SSL_KEY_STORE_PASSWORD, keystorePassword);
+ setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE, truststore);
+ setTestSystemProperty(JAVAX_NET_SSL_TRUST_STORE_PASSWORD, truststorePassword);
+ }
+
+ private void assertFailoverURIList(JmsConnection connection, List<URI> expectedURIs) throws Exception {
+ FailoverProvider provider = getFailoverProvider(connection);
+
+ Field urisField = provider.getClass().getDeclaredField("uris");
+ urisField.setAccessible(true);
+ Object urisObj = urisField.get(provider);
+
+ assertNotNull("Expected to get a uri pool instance", urisObj);
+ assertTrue("Unexpected uri pool type: " + urisObj.getClass(), urisObj instanceof FailoverUriPool);
+ FailoverUriPool uriPool = (FailoverUriPool) urisObj;
+
+ List<URI> current = uriPool.getList();
+ assertEquals(expectedURIs, current);
+ }
+
+ private FailoverProvider getFailoverProvider(JmsConnection connection) throws Exception {
+ Field field = connection.getClass().getDeclaredField("provider");
+ field.setAccessible(true);
+ Object providerObj = field.get(connection);
+
+ assertNotNull("Expected to get a provdier instance", providerObj);
+ assertTrue("Unexpected provider type: " + providerObj.getClass(), providerObj instanceof FailoverProvider);
+ FailoverProvider provider = (FailoverProvider) providerObj;
+ return provider;
+ }
+
+ private JmsConnection establishAnonymousConnecton(String failoverParams, TestAmqpPeer... peers) throws Exception {
+ return establishAnonymousConnecton(null, failoverParams, peers);
+ }
+
+ private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, TestAmqpPeer... peers) throws Exception {
+ return establishAnonymousConnecton(connectionParams, failoverParams, false, peers);
+ }
+
+ private JmsConnection establishAnonymousConnecton(String connectionParams, String failoverParams, boolean ssl, TestAmqpPeer... peers) throws Exception {
+ if (peers.length == 0) {
+ throw new IllegalArgumentException("No test peers were given, at least 1 required");
+ }
+
+ String remoteURI = "failover:(";
+ boolean first = true;
+ for (TestAmqpPeer peer : peers) {
+ if (!first) {
+ remoteURI += ",";
+ }
+ remoteURI += createPeerURI(peer, connectionParams).toString();
+ first = false;
+ }
+
+ if (failoverParams == null) {
+ remoteURI += ")?failover.maxReconnectAttempts=10";
+ } else {
+ remoteURI += ")" + (failoverParams.startsWith("?") ? "" : "?") + failoverParams;
+ }
+
+ ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+ Connection connection = factory.createConnection();
+
+ return (JmsConnection) connection;
+ }
+
+ private URI createPeerURI(TestAmqpPeer peer) throws Exception {
+ return createPeerURI(peer, null);
+ }
+
+ private URI createPeerURI(TestAmqpPeer peer, String params) throws Exception {
+ String scheme = peer.isSSL() ? "amqps" : "amqp";
+ URI result = new URI(scheme, "localhost:" + peer.getServerPort(), null, null, null);
+
+ Map<String, String> queryParameters = PropertyUtil.parseQuery(params);
+
+ return URISupport.applyParameters(result, queryParameters);
+ }
+
+ private boolean isExpectedHost(URI expected, URI actual) {
+ if (!expected.getHost().equals(actual.getHost())) {
+ LOG.info("Expected host {} but got host {}", expected.getHost(), actual.getHost());
+ return false;
+ }
+
+ if (expected.getPort() != actual.getPort()) {
+ LOG.info("Expected host {} on port {} but got host {} on port {}",
+ expected.getHost(), expected.getPort(), actual.getHost(), actual.getPort());
+ return false;
+ }
+
+ return true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index f751beb..719fbc7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -19,8 +19,8 @@
package org.apache.qpid.jms.test.testpeer;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
-import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
@@ -227,6 +227,11 @@ public class TestAmqpPeer implements AutoCloseable
return _driverRunnable.getClientSocket();
}
+ public boolean isSSL()
+ {
+ return _driverRunnable.isSSL();
+ }
+
public int getAdvertisedIdleTimeout()
{
return advertisedIdleTimeout;
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 0f01256..eb0b2a0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -258,4 +258,8 @@ class TestAmqpPeerRunner implements Runnable
public boolean isSendSaslHeaderPreEmptively() {
return _sendSaslHeaderPreEmptively;
}
+
+ public boolean isSSL() {
+ return _serverSocket instanceof SSLServerSocket;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
index 718c422..14418ef 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpToMockServerTest.java
@@ -217,7 +217,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase {
}
}
- @Test(timeout = 60 * 1000)
+ @Test(timeout = 20 * 1000)
public void testConnectToWSServerWhenRedirectedWithNewPath() throws Exception {
try (NettySimpleAmqpServer primary = createWSServer(createServerOptions());
NettySimpleAmqpServer redirect = createWSServer(createServerOptions())) {
@@ -314,7 +314,7 @@ public class NettyTcpToMockServerTest extends QpidJmsTestCase {
protected URI createFailoverURI(NettyServer server) throws Exception {
URI serverURI = createConnectionURI(server, null);
- String failoverURI = "failover:(" + serverURI.toString() + ")";
+ String failoverURI = "failover:(" + serverURI.toString() + "?amqp.vhost=localhost)?failover.maxReconnectAttempts=3";
return new URI(failoverURI);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
index ba2674e..3e16c50 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/URISupportTest.java
@@ -272,14 +272,29 @@ public class URISupportTest {
parameters.put("t.proxyHost", "localhost");
parameters.put("t.proxyPort", "80");
+ uri = URISupport.applyParameters(uri, parameters, "t.");
+ Map<String,String> appliedParameters = URISupport.parseParameters(uri);
+ assertEquals("all params applied with no prefix", 3, appliedParameters.size());
+ verifyParams(appliedParameters);
+ }
+
+ @Test
+ public void testApplyParametersOverwritesOriginalParameters() throws Exception {
+ URI uri = new URI("http://0.0.0.0:61616?proxyHost=host&proxyPort=21&timeout=1000");
+
+ Map<String,String> parameters = new HashMap<String, String>();
+ parameters.put("proxyHost", "localhost");
+ parameters.put("proxyPort", "80");
+
uri = URISupport.applyParameters(uri, parameters);
Map<String,String> appliedParameters = URISupport.parseParameters(uri);
assertEquals("all params applied with no prefix", 3, appliedParameters.size());
+ verifyParams(appliedParameters);
}
private void verifyParams(Map<String,String> parameters) {
- assertEquals(parameters.get("proxyHost"), "localhost");
- assertEquals(parameters.get("proxyPort"), "80");
+ assertEquals("localhost", parameters.get("proxyHost"));
+ assertEquals("80", parameters.get("proxyPort"));
}
//---- isCompositeURI ----------------------------------------------------//
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org