You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/03/21 21:09:43 UTC

qpid-jms git commit: QPIDJMS-369 Apply reconnect attempts limit on failed initial connect

Repository: qpid-jms
Updated Branches:
  refs/heads/master 054e24c58 -> 28f0e8083


QPIDJMS-369 Apply reconnect attempts limit on failed initial connect

If never connected then apply the reconnect limit properly when remote
is closing on the Open perfromative.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/28f0e808
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/28f0e808
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/28f0e808

Branch: refs/heads/master
Commit: 28f0e808351e4e765d85b322efa121855461e1b0
Parents: 054e24c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 21 17:09:23 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 21 17:09:23 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java | 57 +++++++-------
 .../failover/FailoverIntegrationTest.java       | 79 ++++++++++++++++++--
 2 files changed, 98 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/28f0e808/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index d8a1497..6c27af8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -614,18 +614,24 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
                         // Stage 5: Let the client know that connection has restored.
                         listener.onConnectionRestored(provider.getRemoteURI());
+
+                        // Last step: Send pending actions.
+                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.run();
+                        }
+
+                        reconnectControl.connectionEstablished();
                     } else {
                         processAlternates(provider.getAlternateURIs());
-                    }
 
-                    // Last step: Send pending actions.
-                    List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
-                    for (FailoverRequest request : pending) {
-                        request.run();
+                        // Last step: Send pending actions.
+                        List<FailoverRequest> pending = new ArrayList<FailoverRequest>(requests.values());
+                        for (FailoverRequest request : pending) {
+                            request.run();
+                        }
                     }
 
-                    reconnectControl.recordConnected();
-
                     // Cancel timeout processing since we are connected again.  We waited until
                     // now for the case where we are continually getting bounced from otherwise
                     // live servers, we want the timeout to remain scheduled in that case so that
@@ -1201,16 +1207,13 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
         @Override
         public void onSuccess() {
-            serializer.execute(new Runnable() {
-                @Override
-                public void run() {
-                    LOG.trace("First connection requst has completed:");
-                    FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
-                    processAlternates(provider.getAlternateURIs());
-                    listener.onConnectionEstablished(provider.getRemoteURI());
-                    reconnectControl.signalRecoveryRequired();
-                    CreateConnectionRequest.this.signalConnected();
-                }
+            serializer.execute(() -> {
+                LOG.trace("First connection requst has completed:");
+                FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+                processAlternates(provider.getAlternateURIs());
+                listener.onConnectionEstablished(provider.getRemoteURI());
+                reconnectControl.connectionEstablished();
+                CreateConnectionRequest.this.signalConnected();
             });
         }
 
@@ -1221,14 +1224,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                 super.onFailure(result);
             } else {
                 LOG.debug("Request received error: {}", result.getMessage());
-                serializer.execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        // If we managed to receive an Open frame it might contain
-                        // a failover update so process it before handling the error.
-                        processAlternates(provider.getAlternateURIs());
-                        handleProviderFailure(IOExceptionSupport.create(result));
-                    }
+                serializer.execute(() -> {
+                    // If we managed to receive an Open frame it might contain
+                    // a failover update so process it before handling the error.
+                    processAlternates(provider.getAlternateURIs());
+                    handleProviderFailure(IOExceptionSupport.create(result));
                 });
             }
         }
@@ -1290,7 +1290,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
         }
 
-        public void recordConnected() {
+        public void connectionEstablished() {
+            recoveryRequired = true;
             nextReconnectDelay = -1;
             reconnectAttempts = 0;
             uris.connected();
@@ -1304,10 +1305,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             return recoveryRequired;
         }
 
-        public void signalRecoveryRequired() {
-            recoveryRequired = true;
-        }
-
         public boolean isLimitExceeded() {
             int reconnectLimit = reconnectAttemptLimit();
             if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/28f0e808/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index defbc2f..e12c312 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -56,6 +56,7 @@ import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.JmsConnectionFactory;
 import org.apache.qpid.jms.JmsDefaultConnectionListener;
 import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsResourceNotFoundException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
 import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -1648,12 +1649,12 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             final CountDownLatch failedConnection = new CountDownLatch(1);
 
             // Create a peer to connect to, then one to reconnect to
-            final String testPeerURI = createPeerURI(firstPeer);
+            final String firstPeerURI = createPeerURI(firstPeer);
 
-            LOG.info("First peer is at: {}", firstPeer);
-            LOG.info("Second peer is at: {}", secondPeer);
-            LOG.info("Third peer is at: {}", thirdPeer);
-            LOG.info("Fourth peer is at: {}", fourthPeer);
+            LOG.info("First peer is at: {}", firstPeerURI);
+            LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
+            LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
+            LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
 
             firstPeer.expectSaslAnonymous();
             firstPeer.expectOpen();
@@ -1676,7 +1677,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
                     LOG.info("Connection Established: {}", remoteURI);
-                    if (testPeerURI.equals(remoteURI.toString())) {
+                    if (firstPeerURI.equals(remoteURI.toString())) {
                         testConnected.countDown();
                     }
                 }
@@ -1689,7 +1690,68 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             });
             connection.start();
 
-            assertTrue("Should connect to test peer", testConnected.await(5, TimeUnit.SECONDS));
+            assertTrue("Should connect to first peer", testConnected.await(5, TimeUnit.SECONDS));
+
+            // --- Failover should handle the connection close ---------------//
+
+            assertTrue("Should reported failed", failedConnection.await(5, TimeUnit.SECONDS));
+
+            try {
+                connection.close();
+            } catch (JMSException jmsEx) {}
+
+            secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
+            thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
+
+            try {
+                fourthPeer.purgeExpectations();
+                fourthPeer.close();
+                fail("Should have not executed any handlers.");
+            } catch (Throwable t) {}
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testStartMaxReconnectAttemptsTriggeredWhenRemotesAreRejecting() throws Exception {
+        try (TestAmqpPeer firstPeer = new TestAmqpPeer();
+             TestAmqpPeer secondPeer = new TestAmqpPeer();
+             TestAmqpPeer thirdPeer = new TestAmqpPeer();
+             TestAmqpPeer fourthPeer = new TestAmqpPeer()) {
+
+            final CountDownLatch failedConnection = new CountDownLatch(1);
+
+            LOG.info("First peer is at: {}", createPeerURI(firstPeer));
+            LOG.info("Second peer is at: {}", createPeerURI(secondPeer));
+            LOG.info("Third peer is at: {}", createPeerURI(thirdPeer));
+            LOG.info("Fourth peer is at: {}", createPeerURI(fourthPeer));
+
+            firstPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+            secondPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+            thirdPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
+
+            // This shouldn't get hit, but if it does accept the connect so we don't pass the failed
+            // to connect assertion.
+            fourthPeer.expectSaslAnonymous();
+            fourthPeer.expectOpen();
+            fourthPeer.expectBegin();
+            fourthPeer.expectClose();
+
+            final JmsConnection connection = establishAnonymousConnecton(
+                "failover.startupMaxReconnectAttempts=3&failover.reconnectDelay=15&failover.useReconnectBackOff=false",
+                firstPeer, secondPeer, thirdPeer, fourthPeer);
+            connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+                @Override
+                public void onConnectionFailure(Throwable cause) {
+                    LOG.info("Connection Failed: {}", cause);
+                    failedConnection.countDown();
+                }
+            });
+
+            try {
+                connection.start();
+                fail("Should not be able to connect");
+            } catch (JmsResourceNotFoundException jmsrnfe) {}
 
             // --- Failover should handle the connection close ---------------//
 
@@ -1699,8 +1761,9 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
                 connection.close();
             } catch (JMSException jmsEx) {}
 
+            firstPeer.waitForAllHandlersToCompleteNoAssert(2000);
             secondPeer.waitForAllHandlersToCompleteNoAssert(2000);
-            thirdPeer.waitForAllHandlersToComplete(2000);
+            thirdPeer.waitForAllHandlersToCompleteNoAssert(2000);
 
             try {
                 fourthPeer.purgeExpectations();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org