You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/10/09 20:35:21 UTC
qpid-jms git commit: QPIDJMS-336 Updates to failover reconnect
behavior
Repository: qpid-jms
Updated Branches:
refs/heads/master 267806dc0 -> 9c7c10dbe
QPIDJMS-336 Updates to failover reconnect behavior
Perform a non-delayed reconnection attempt to each URI in the
failover list before pausing based on configuration in between
each cycle of reconnection attempts.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9c7c10db
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9c7c10db
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9c7c10db
Branch: refs/heads/master
Commit: 9c7c10dbef607cf3043c6613c303b3d3858070c7
Parents: 267806d
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 5 11:37:35 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Oct 9 16:25:47 2017 -0400
----------------------------------------------------------------------
.../jms/provider/failover/FailoverProvider.java | 48 ++++++++++++--------
.../jms/provider/failover/FailoverUriPool.java | 7 +++
.../provider/failover/FailoverProviderTest.java | 32 ++++++++++++-
3 files changed, 65 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9c7c10db/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index b82c6f6..28714c2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -682,26 +682,33 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
reconnectAttempts++;
Throwable failure = null;
- URI target = uris.getNext();
- if (target != null) {
- Provider provider = null;
- try {
- LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target);
- provider = ProviderFactory.create(target);
- provider.connect(connectionInfo);
- initializeNewConnection(provider);
- return;
- } catch (Throwable e) {
- LOG.info("Connection attempt:[{}] to: {} failed", reconnectAttempts, target);
- failure = e;
+ if (!uris.isEmpty()) {
+ for (int i = 0; i < uris.size(); ++i) {
+ URI target = uris.getNext();
+ if (target == null) {
+ LOG.warn("Failover URI collection unexpectedly modified during connection attempt.");
+ continue;
+ }
+
+ Provider provider = null;
try {
- if (provider != null) {
- provider.close();
- }
- } catch (Throwable ex) {}
+ LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target);
+ provider = ProviderFactory.create(target);
+ provider.connect(connectionInfo);
+ initializeNewConnection(provider);
+ return;
+ } catch (Throwable e) {
+ LOG.info("Connection attempt:[{}] to: {} failed", reconnectAttempts, target);
+ failure = e;
+ try {
+ if (provider != null) {
+ provider.close();
+ }
+ } catch (Throwable ex) {}
+ }
}
} else {
- LOG.debug("No target URI available to connect to");
+ LOG.debug("No remote URI available to connect to in failover list");
}
if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
@@ -866,6 +873,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
// as it is meant for the failover nodes. The pool will de-dup if it is.
newRemotes.add(0, connectedURI);
try {
+ LOG.info("Replacing uris:{} with new set: {}", uris, newRemotes);
uris.replaceAll(newRemotes);
} catch (Throwable err) {
LOG.warn("Error while attempting to add discovered URIs: {}", discovered);
@@ -875,7 +883,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
// Do Nothing
break;
default:
- // Shouldnt get here, but do nothing if we do.
+ // Shouldn't get here, but do nothing if we do.
break;
}
@@ -891,7 +899,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
//--------------- URI update and rebalance methods -----------------------//
public void add(final URI uri) {
- serializer.execute(new Runnable() {
+ connectionHub.execute(new Runnable() {
@Override
public void run() {
uris.add(uri);
@@ -900,7 +908,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
public void remove(final URI uri) {
- serializer.execute(new Runnable() {
+ connectionHub.execute(new Runnable() {
@Override
public void run() {
uris.remove(uri);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9c7c10db/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
index 2227141..f690da2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
@@ -284,6 +284,13 @@ public class FailoverUriPool {
return nestedOptions;
}
+ @Override
+ public String toString() {
+ synchronized (uris) {
+ return "URI Pool { " + uris + " }";
+ }
+ }
+
//----- Internal methods that require the locks be held ------------------//
private boolean contains(URI newURI) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9c7c10db/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index 031d668..0f3b5d7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -231,9 +231,9 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
}
@Test(timeout = 30000)
- public void testMaxReconnectAttempts() throws Exception {
+ public void testMaxReconnectAttemptsWithOneURI() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(
- "failover:(mock://localhost?mock.failOnConnect=true)" +
+ "failover:(mock://localhost1?mock.failOnConnect=true)" +
"?failover.maxReconnectAttempts=5" +
"&failover.useReconnectBackOff=false");
@@ -255,6 +255,34 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
}
@Test(timeout = 30000)
+ public void testMaxReconnectAttemptsWithMultipleURIs() throws Exception {
+ JmsConnectionFactory factory = new JmsConnectionFactory(
+ "failover:(mock://localhost1?mock.failOnConnect=true," +
+ "mock://localhost2?mock.failOnConnect=true," +
+ "mock://localhost3?mock.failOnConnect=true)" +
+ "?failover.maxReconnectAttempts=5" +
+ "&failover.reconnectDelay=1" +
+ "&failover.useReconnectBackOff=false");
+
+ Connection connection = null;
+ try {
+ connection = factory.createConnection();
+ connection.start();
+ fail("Should have stopped after five retries.");
+ } catch (JMSException ex) {
+ } finally {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ // The number should scale by the number of URIs in the list
+ assertEquals(15, mockPeer.getContextStats().getProvidersCreated());
+ assertEquals(15, mockPeer.getContextStats().getConnectionAttempts());
+ assertEquals(15, mockPeer.getContextStats().getCloseAttempts());
+ }
+
+ @Test(timeout = 30000)
public void testMaxReconnectAttemptsWithBackOff() throws Exception {
JmsConnectionFactory factory = new JmsConnectionFactory(
"failover:(mock://localhost?mock.failOnConnect=true)" +
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org