You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/03/21 21:09:43 UTC
qpid-jms git commit: QPIDJMS-369 Apply reconnect attempts limit on
failed initial connect
Repository: qpid-jms
Updated Branches:
refs/heads/master 054e24c58 -> 28f0e8083
QPIDJMS-369 Apply reconnect attempts limit on failed initial connect
If never connected then apply the reconnect limit properly when remote
is closing on the Open perfromative.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/28f0e808
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/28f0e808
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/28f0e808
Branch: refs/heads/master
Commit: 28f0e808351e4e765d85b322efa121855461e1b0
Parents: 054e24c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 21 17:09:23 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 21 17:09:23 2018 -0400
----------------------------------------------------------------------
.../jms/provider/failover/FailoverProvider.java | 57 +++++++-------
.../failover/FailoverIntegrationTest.java | 79 ++++++++++++++++++--
2 files changed, 98 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/28f0e808/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index d8a1497..6c27af8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -614,18 +614,24 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
// Stage 5: Let the client know that connection has restored.
listener.onConnectionRestored(provider.getRemoteURI());
+
+ // Last step: Send pending actions.
+ List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+ for (FailoverRequest request : pending) {
+ request.run();
+ }
+
+ reconnectControl.connectionEstablished();
} else {
processAlternates(provider.getAlternateURIs());
- }
- // Last step: Send pending actions.
- List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
- for (FailoverRequest request : pending) {
- request.run();
+ // Last step: Send pending actions.
+ List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+ for (FailoverRequest request : pending) {
+ request.run();
+ }
}
- reconnectControl.recordConnected();
-
// Cancel timeout processing since we are connected again. We waited until
// now for the case where we are continually getting bounced from otherwise
// live servers, we want the timeout to remain scheduled in that case so that
@@ -1201,16 +1207,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
@Override
public void onSuccess() {
- serializer.execute(new Runnable() {
- @Override
- public void run() {
- LOG.trace("First connection requst has completed:");
- FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
- processAlternates(provider.getAlternateURIs());
- listener.onConnectionEstablished(provider.getRemoteURI());
- reconnectControl.signalRecoveryRequired();
- CreateConnectionRequest.this.signalConnected();
- }
+ serializer.execute(() -> {
+ LOG.trace("First connection requst has completed:");
+ FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+ processAlternates(provider.getAlternateURIs());
+ listener.onConnectionEstablished(provider.getRemoteURI());
+ reconnectControl.connectionEstablished();
+ CreateConnectionRequest.this.signalConnected();
});
}
@@ -1221,14 +1224,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
super.onFailure(result);
} else {
LOG.debug("Request received error: {}", result.getMessage());
- serializer.execute(new Runnable() {
- @Override
- public void run() {
- // If we managed to receive an Open frame it might contain
- // a failover update so process it before handling the error.
- processAlternates(provider.getAlternateURIs());
- handleProviderFailure(IOExceptionSupport.create(result));
- }
+ serializer.execute(() -> {
+ // If we managed to receive an Open frame it might contain
+ // a failover update so process it before handling the error.
+ processAlternates(provider.getAlternateURIs());
+ handleProviderFailure(IOExceptionSupport.create(result));
});
}
}
@@ -1290,7 +1290,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
}
- public void recordConnected() {
+ public void connectionEstablished() {
+ recoveryRequired = true;
nextReconnectDelay = -1;
reconnectAttempts = 0;
uris.connected();
@@ -1304,10 +1305,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
return recoveryRequired;
}
- public void signalRecoveryRequired() {
- recoveryRequired = true;
- }
-
public boolean isLimitExceeded() {
int reconnectLimit = reconnectAttemptLimit();
if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/28f0e808/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index defbc2f..e12c312 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsResourceNotFoundException;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -1648,12 +1649,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
final CountDownLatch failedConnection = new CountDownLatch(1);
// Create a peer to connect to, then one to reconnect to
- final String testPeerURI = createPeerURI(firstPeer);
+ final String firstPeerURI = createPeerURI(firstPeer);
- LOG.info("First peer is at: {}", firstPeer);
- LOG.info("Second peer is at: {}", secondPeer);
- LOG.info("Third peer is at: {}", thirdPeer);
- LOG.info("Fourth peer is at: {}", fourthPeer);
+ LOG.info("First peer is at: {}", firstPeerURI);
+ LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
+ LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
+ LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
firstPeer.expectSaslAnonymous();
firstPeer.expectOpen();
@@ -1676,7 +1677,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
@Override
public void onConnectionEstablished(URI remoteURI) {
LOG.info("Connection Established: {}", remoteURI);
- if (testPeerURI.equals(remoteURI.toString())) {
+ if (firstPeerURI.equals(remoteURI.toString())) {
testConnected.countDown();
}
}
@@ -1689,7 +1690,68 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
});
connection.start();
- assertTrue("Should connect to test peer", testConnected.await(5, TimeUnit.SECONDS));
+ assertTrue("Should connect to first peer", testConnected.await(5, TimeUnit.SECONDS));
+
+ // --- Failover should handle the connection close ---------------//
+
+ assertTrue("Should reported failed", failedConnection.await(5, TimeUnit.SECONDS));
+
+ try {
+ connection.close();
+ } catch (JMSException jmsEx) {}
+
+ secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
+ thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
+
+ try {
+ fourthPeer.purgeExpectations();
+ fourthPeer.close();
+ fail("Should have not executed any handlers.");
+ } catch (Throwable t) {}
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testStartMaxReconnectAttemptsTriggeredWhenRemotesAreRejecting() throws Exception {
+ try (TestAmqpPeer firstPeer = new TestAmqpPeer();
+ TestAmqpPeer secondPeer = new TestAmqpPeer();
+ TestAmqpPeer thirdPeer = new TestAmqpPeer();
+ TestAmqpPeer fourthPeer = new TestAmqpPeer()) {
+
+ final CountDownLatch failedConnection = new CountDownLatch(1);
+
+ LOG.info("First peer is at: {}", createPeerURI(firstPeer));
+ LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
+ LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
+ LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
+
+ firstPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+ secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+ thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+
+ // This shouldn't get hit, but if it does accept the connect so we don't pass the failed
+ // to connect assertion.
+ fourthPeer.expectSaslAnonymous();
+ fourthPeer.expectOpen();
+ fourthPeer.expectBegin();
+ fourthPeer.expectClose();
+
+ final JmsConnection connection = establishAnonymousConnecton(
+ "failover.startupMaxReconnectAttempts=3&failover.reconnectDelay=15&failover.useReconnectBackOff=false",
+ firstPeer, secondPeer, thirdPeer, fourthPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onConnectionFailure(Throwable cause) {
+ LOG.info("Connection Failed: {}", cause);
+ failedConnection.countDown();
+ }
+ });
+
+ try {
+ connection.start();
+ fail("Should not be able to connect");
+ } catch (JmsResourceNotFoundException jmsrnfe) {}
// --- Failover should handle the connection close ---------------//
@@ -1699,8 +1761,9 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
connection.close();
} catch (JMSException jmsEx) {}
+ firstPeer.waitForAllHandlersToCompleteNoAssert(2000);
secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
- thirdPeer.waitForAllHandlersToComplete(2000);
+ thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
try {
fourthPeer.purgeExpectations();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org