You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/03/14 19:05:21 UTC

qpid-jms git commit: QPIDJMS-365 Correct failover reconnect delays when remote closes

Repository: qpid-jms
Updated Branches:
  refs/heads/master 39570cfbf -> daec04641


QPIDJMS-365 Correct failover reconnect delays when remote closes

Remember reconnect delay state until the connection is fully reconnected
to avoid restarting delay processing when the remote closes cleanly
after an initial open attempt due to some condition on the remote.  Adds
additional error handling and logging as well as some refactoring of the
overall reconnect handling logic.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/daec0464
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/daec0464
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/daec0464

Branch: refs/heads/master
Commit: daec046413b349f7187ed8490b9a6376c581c848
Parents: 39570cf
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 14 15:05:02 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 14 15:05:02 2018 -0400

----------------------------------------------------------------------
 .../jms/provider/failover/FailoverProvider.java | 312 +++++++++++--------
 .../failover/FailoverIntegrationTest.java       |  17 +-
 2 files changed, 205 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/daec0464/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index 1c6a2bb..25a6739 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -101,10 +101,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>();
 
     // Current state of connection / reconnection
-    private boolean firstAttempt = true;
-    private boolean firstConnection = true;
-    private long reconnectAttempts;
-    private long nextReconnectDelay = -1;
+    private final ReconnectControls reconnectControl = new ReconnectControls();
     private IOException failureCause;
     private URI connectedURI;
     private volatile JmsConnectionInfo connectionInfo;
@@ -542,8 +539,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             }
             provider = null;
 
-            if (reconnectAllowed(cause)) {
-
+            if (reconnectControl.isReconnectAllowed(cause)) {
                 if (cause instanceof ProviderRedirectedException) {
                     ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
                     try {
@@ -596,7 +592,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                     provider.setProviderListener(FailoverProvider.this);
                     connectedURI = provider.getRemoteURI();
 
-                    if (!firstConnection) {
+                    if (reconnectControl.isRecoveryRequired()) {
                         LOG.debug("Signalling connection recovery: {}", provider);
 
                         // Stage 1: Allow listener to recover its resources
@@ -627,9 +623,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                         request.run();
                     }
 
-                    nextReconnectDelay = reconnectDelay;
-                    reconnectAttempts = 0;
-                    uris.connected();
+                    reconnectControl.recordConnected();
 
                     // Cancel timeout processing since we are connected again.  We waited until
                     // now for the case where we are continually getting bounced from otherwise
@@ -641,6 +635,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                     }
 
                 } catch (Throwable error) {
+                    LOG.trace("Connection attempt:[{}] to: {} failed", reconnectControl.reconnectAttempts, provider.getRemoteURI());
                     handleProviderFailure(IOExceptionSupport.create(error));
                 }
             }
@@ -649,8 +644,9 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
     /**
      * Called when the Provider was either first created or when a connection failure has
-     * been reported.  A reconnection attempt is immediately executed on the connection
-     * thread.  If a new Provider is able to be created and connected then a recovery task
+     * been reported.  A reconnection attempt is executed on the connection thread either
+     * immediately or after a delay based on configuration an number of attempts that have
+     * elapsed.  If a new Provider is able to be created and connected then a recovery task
      * is scheduled on the main serializer thread.  If the connect attempt fails another
      * attempt is scheduled based on the configured delay settings until a max attempts
      * limit is hit, if one is set.
@@ -665,8 +661,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             return;
         }
 
-        connectionHub.execute(new Runnable() {
-            boolean delayed = false;
+        reconnectControl.scheduleReconnect(new Runnable() {
 
             @Override
             public void run() {
@@ -674,116 +669,80 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                     return;
                 }
 
-                int reconnectLimit = reconnectAttemptLimit();
-                if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
-                    return;
-                }
-
-                boolean first = firstAttempt;
-                firstAttempt = false;
-
-                if (!delayed && !first && initialReconnectDelay > 0 && reconnectAttempts == 0) {
-                    delayed = true;
-                    LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay);
-                    connectionHub.schedule(this, initialReconnectDelay, TimeUnit.MILLISECONDS);
-                    return;
-                }
-
-                reconnectAttempts++;
                 Throwable failure = null;
-                if (!uris.isEmpty()) {
-                    for (int i = 0; i < uris.size(); ++i) {
-                        URI target = uris.getNext();
-                        if (target == null) {
-                            LOG.warn("Failover URI collection unexpectedly modified during connection attempt.");
-                            continue;
-                        }
 
-                        Provider provider = null;
-                        try {
-                            LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target);
-                            provider = ProviderFactory.create(target);
-                            provider.connect(connectionInfo);
-                            initializeNewConnection(provider);
-                            return;
-                        } catch (Throwable e) {
-                            LOG.info("Connection attempt:[{}] to: {} failed", reconnectAttempts, target);
-                            failure = e;
+                long reconnectAttempts = reconnectControl.recordNextAttempt();
+
+                try {
+                    if (!uris.isEmpty()) {
+                        for (int i = 0; i < uris.size(); ++i) {
+                            URI target = uris.getNext();
+                            if (target == null) {
+                                LOG.warn("Failover URI collection unexpectedly modified during connection attempt.");
+                                continue;
+                            }
+
+                            Provider provider = null;
                             try {
-                                if (provider != null) {
-                                    provider.close();
-                                }
-                            } catch (Throwable ex) {}
+                                LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts, target);
+                                provider = ProviderFactory.create(target);
+                                provider.connect(connectionInfo);
+                                initializeNewConnection(provider);
+                                return;
+                            } catch (Throwable e) {
+                                LOG.info("Connection attempt:[{}] to: {} failed", reconnectAttempts, target);
+                                failure = e;
+                                try {
+                                    if (provider != null) {
+                                        provider.close();
+                                    }
+                                } catch (Throwable ex) {}
+                            }
                         }
-                    }
-                } else {
-                    LOG.debug("No remote URI available to connect to in failover list");
-                }
-
-                if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
-                    LOG.error("Failed to connect after: " + reconnectAttempts + " attempt(s)");
-                    failed.set(true);
-                    if (failure == null) {
-                        failureCause = new IOException("Failed to connect after: " + reconnectAttempts + " attempt(s)");
                     } else {
-                        failureCause = IOExceptionSupport.create(failure);
+                        LOG.debug("No remote URI available to connect to in failover list");
+                        failure = new IOException(
+                            "No remote URI available for reconnection during connection attempt: " + reconnectAttempts);
+                    }
+                } catch (Throwable unknownFailure) {
+                    LOG.info("Connection attempt:[{}] failed abnormally.", reconnectAttempts);
+                    failure = failure == null ? unknownFailure : failure;
+                } finally {
+                    if (failure != null) {
+                        if (reconnectControl.isLimitExceeded()) {
+                            reportReconnectFailure(failure);
+                        } else {
+                            reconnectControl.scheduleReconnect(this);
+                        }
                     }
-                    if (listener != null) {
-                        listener.onConnectionFailure(failureCause);
-                    };
-
-                    return;
-                }
-
-                int warnInterval = getWarnAfterReconnectAttempts();
-                if (warnInterval > 0 && (reconnectAttempts % warnInterval) == 0) {
-                    LOG.warn("Failed to connect after: {} attempt(s) continuing to retry.", reconnectAttempts);
                 }
-
-                long delay = nextReconnectDelay();
-                LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
-                connectionHub.schedule(this, delay, TimeUnit.MILLISECONDS);
             }
         });
     }
 
-    private boolean reconnectAllowed(IOException cause) {
-        // If a connection attempts fail due to Security errors than
-        // we abort reconnection as there is a configuration issue and
-        // we want to avoid a spinning reconnect cycle that can never
-        // complete.
-        if (cause.getCause() instanceof JMSSecurityException) {
-            return false;
-        }
-
-        return reconnectAttemptLimit() != 0;
-    }
-
-    private int reconnectAttemptLimit() {
-        int maxReconnectValue = this.maxReconnectAttempts;
-        if (firstConnection && this.startupMaxReconnectAttempts != UNDEFINED) {
-            // If this is the first connection and a specific startup retry limit
-            // is configured then use it, otherwise use the main reconnect limit
-            maxReconnectValue = this.startupMaxReconnectAttempts;
-        }
-        return maxReconnectValue;
-    }
-
-    private long nextReconnectDelay() {
-
-        if (nextReconnectDelay == -1) {
-            nextReconnectDelay = reconnectDelay;
-        }
-
-        if (isUseReconnectBackOff() && reconnectAttempts > 1) {
-            // Exponential increment of reconnect delay.
-            nextReconnectDelay *= getReconnectBackOffMultiplier();
-            if (nextReconnectDelay > maxReconnectDelay) {
-                nextReconnectDelay = maxReconnectDelay;
+    /**
+     * Called when the reconnection executor has tried for the last time based on max reconnects
+     * configuration and we now consider this connection attempt to be failed.  This method will
+     * run the failure processing on the serializer executor to ensure that all events back to the
+     * client layer happen from the same thread.
+     *
+     * @param lastFailure the last failure encountered while trying to (re)connect.
+     */
+    private void reportReconnectFailure(final Throwable lastFailure) {
+        serializer.execute(() -> {
+            LOG.error("Failed to connect after: " + reconnectControl.reconnectAttempts + " attempt(s)");
+            if (failed.compareAndSet(false, true)) {
+                if (lastFailure == null) {
+                    failureCause = new IOException(
+                        "Failed to connect after: " + reconnectControl.reconnectAttempts + " attempt(s)");
+                } else {
+                    failureCause = IOExceptionSupport.create(lastFailure);
+                }
+                if (listener != null) {
+                    listener.onConnectionFailure(failureCause);
+                };
             }
-        }
-
-        return nextReconnectDelay;
+        });
     }
 
     protected void checkClosed() throws IOException {
@@ -1232,16 +1191,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
             serializer.execute(new Runnable() {
                 @Override
                 public void run() {
-                    if (firstConnection) {
-                        LOG.trace("First connection requst has completed:");
-                        FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
-                        processAlternates(provider.getAlternateURIs());
-                        listener.onConnectionEstablished(provider.getRemoteURI());
-                        firstConnection = false;
-                    } else {
-                        LOG.warn("A second call to a CreateConnectionRequest not expected.");
-                    }
-
+                    LOG.trace("First connection requst has completed:");
+                    FailoverProvider.this.messageFactory.set(provider.getMessageFactory());
+                    processAlternates(provider.getAlternateURIs());
+                    listener.onConnectionEstablished(provider.getRemoteURI());
+                    reconnectControl.signalRecoveryRequired();
                     CreateConnectionRequest.this.signalConnected();
                 }
             });
@@ -1274,4 +1228,118 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private static enum FailoverServerListAction {
         ADD, REPLACE, IGNORE
     }
+
+    private class ReconnectControls {
+
+        // Reconnection state tracking
+        private volatile boolean recoveryRequired;
+        private volatile long reconnectAttempts;
+        private volatile long nextReconnectDelay = -1;
+
+        public void scheduleReconnect(Runnable runnable) {
+            try {
+                // Warn of ongoing connection attempts if configured.
+                int warnInterval = getWarnAfterReconnectAttempts();
+                if (reconnectAttempts > 0 && warnInterval > 0 && (reconnectAttempts % warnInterval) == 0) {
+                    LOG.warn("Failed to connect after: {} attempt(s) continuing to retry.", reconnectAttempts);
+                }
+
+                // If no connection recovery required then we have never fully connected to a remote
+                // so we proceed down the connect with one immediate connection attempt and then follow
+                // on delayed attempts based on configuration.
+                if (!recoveryRequired) {
+                    if (reconnectAttempts == 0) {
+                        LOG.trace("Initial connect attempt will be performed immediately");
+                        connectionHub.execute(runnable);
+                    } else if (reconnectAttempts == 1 && initialReconnectDelay > 0) {
+                        LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay);
+                        connectionHub.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
+                    } else {
+                        long delay = reconnectControl.nextReconnectDelay();
+                        LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
+                        connectionHub.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+                    }
+                } else if (reconnectAttempts == 0) {
+                    if (initialReconnectDelay > 0) {
+                        LOG.trace("Delayed initial reconnect attempt will be in {} milliseconds", initialReconnectDelay);
+                        connectionHub.schedule(runnable, initialReconnectDelay, TimeUnit.MILLISECONDS);
+                    } else {
+                        LOG.trace("Initial Reconnect attempt will be performed immediately");
+                        connectionHub.execute(runnable);
+                    }
+                } else {
+                    long delay = reconnectControl.nextReconnectDelay();
+                    LOG.trace("Next reconnect attempt will be in {} milliseconds", delay);
+                    connectionHub.schedule(runnable, delay, TimeUnit.MILLISECONDS);
+                }
+            } catch (Throwable unrecoverable) {
+                reportReconnectFailure(unrecoverable);
+            }
+        }
+
+        public void recordConnected() {
+            nextReconnectDelay = -1;
+            reconnectAttempts = 0;
+            uris.connected();
+        }
+
+        public long recordNextAttempt() {
+            return ++reconnectAttempts;
+        }
+
+        public boolean isRecoveryRequired() {
+            return recoveryRequired;
+        }
+
+        public void signalRecoveryRequired() {
+            recoveryRequired = true;
+        }
+
+        public boolean isLimitExceeded() {
+            int reconnectLimit = reconnectAttemptLimit();
+            if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) {
+                return true;
+            }
+
+            return false;
+        }
+
+        public boolean isReconnectAllowed(IOException cause) {
+            // If a connection attempts fail due to Security errors than
+            // we abort reconnection as there is a configuration issue and
+            // we want to avoid a spinning reconnect cycle that can never
+            // complete.
+            if (cause.getCause() instanceof JMSSecurityException) {
+                return false;
+            }
+
+            return reconnectAttemptLimit() != 0;
+        }
+
+        private int reconnectAttemptLimit() {
+            int maxReconnectValue = maxReconnectAttempts;
+            if (!recoveryRequired && startupMaxReconnectAttempts != UNDEFINED) {
+                // If this is the first connection attempt and a specific startup retry limit
+                // is configured then use it, otherwise use the main reconnect limit
+                maxReconnectValue = startupMaxReconnectAttempts;
+            }
+            return maxReconnectValue;
+        }
+
+        private long nextReconnectDelay() {
+            if (nextReconnectDelay == -1) {
+                nextReconnectDelay = reconnectDelay;
+            }
+
+            if (isUseReconnectBackOff() && reconnectAttempts > 1) {
+                // Exponential increment of reconnect delay.
+                nextReconnectDelay *= getReconnectBackOffMultiplier();
+                if (nextReconnectDelay > maxReconnectDelay) {
+                    nextReconnectDelay = maxReconnectDelay;
+                }
+            }
+
+            return nextReconnectDelay;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/daec0464/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 8174eea..0abf459 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -16,7 +16,11 @@
  */
 package org.apache.qpid.jms.provider.failover;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
@@ -177,7 +181,11 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             originalPeer.expectOpen();
             originalPeer.expectBegin();
 
-            final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
+            long ird = 0;
+            long rd = 2000;
+            long start = System.currentTimeMillis();
+
+            final JmsConnection connection = establishAnonymousConnecton("failover.initialReconnectDelay=" + ird + "&failover.reconnectDelay=" + rd + "&failover.maxReconnectAttempts=10", originalPeer, rejectingPeer, finalPeer);
             connection.addConnectionListener(new JmsDefaultConnectionListener() {
                 @Override
                 public void onConnectionEstablished(URI remoteURI) {
@@ -201,7 +209,7 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount());
 
             // Set expectations on rejecting and final peer
-            rejectingPeer.expectSaslHeaderThenDrop();
+            rejectingPeer.rejectConnect(AmqpError.NOT_FOUND, "Resource could not be located", null);
 
             finalPeer.expectSaslAnonymous();
             finalPeer.expectOpen();
@@ -213,6 +221,11 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
             rejectingPeer.waitForAllHandlersToComplete(2000);
 
             assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+            long end = System.currentTimeMillis();
+
+            long margin = 2000;
+            assertThat("Elapsed time outwith expected range for reconnect", end - start,
+                    both(greaterThanOrEqualTo(ird + rd)).and(lessThanOrEqualTo(ird + rd + margin)));
 
             // Shut it down
             finalPeer.expectClose();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org