You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/10 19:02:36 UTC

[activemq-artemis] branch master updated: ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b6294  ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes
     new b583ba7  This closes #2830
60b6294 is described below

commit 60b62940b9887b962efd7f2e54ba324042e67c0b
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Mon Sep 9 15:36:06 2019 -0400

    ARTEMIS-2479 Initial connection will not work with infinite retry and multiple nodes
---
 .../core/client/impl/ClientSessionFactoryImpl.java | 21 ++++--
 .../client/impl/ClientSessionFactoryInternal.java  |  2 +
 .../core/client/impl/ServerLocatorImpl.java        | 49 ++++++++++----
 .../integration/client/InitialConnectionTest.java  | 79 ++++++++++++++++++++++
 .../integration/ra/ActiveMQClusteredTest.java      | 37 +++++-----
 5 files changed, 147 insertions(+), 41 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
index 47237b5..4ca1af1 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
@@ -826,13 +826,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
                   ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);
                }
 
-               try {
-                  if (clientProtocolManager.waitOnLatch(interval)) {
-                     return;
-                  }
-               } catch (InterruptedException ignore) {
-                  throw new ActiveMQInterruptedException(createTrace);
-               }
+               if (waitForRetry(interval))
+                  return;
 
                // Exponential back-off
                long newInterval = (long) (interval * retryIntervalMultiplier);
@@ -850,6 +845,18 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
       }
    }
 
+   @Override
+   public boolean waitForRetry(long interval) {
+      try {
+         if (clientProtocolManager.waitOnLatch(interval)) {
+            return true;
+         }
+      } catch (InterruptedException ignore) {
+         throw new ActiveMQInterruptedException(createTrace);
+      }
+      return false;
+   }
+
    private void cancelScheduledTasks() {
       Future<?> pingerFutureLocal = pingerFuture;
       if (pingerFutureLocal != null) {
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
index f72ca5e..408c8e7 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryInternal.java
@@ -60,4 +60,6 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
    ConfirmationWindowWarning getConfirmationWindowWarning();
 
    Lock lockFailover();
+
+   boolean waitForRetry(long interval);
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
index a05a8af..fd3507e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
@@ -563,6 +563,24 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       }
    }
 
+   private int getConnectorsSize() {
+      Pair<TransportConfiguration, TransportConfiguration>[] usedTopology;
+
+      flushTopology();
+
+      synchronized (topologyArrayGuard) {
+         usedTopology = topologyArray;
+      }
+
+      synchronized (this) {
+         if (usedTopology != null && useTopologyForLoadBalancing) {
+            return usedTopology.length;
+         } else {
+            return initialConnectors.length;
+         }
+      }
+   }
+
    @Override
    public void start(Executor executor) throws Exception {
       initialize();
@@ -764,9 +782,9 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
       ClientSessionFactoryInternal factory = null;
 
       synchronized (this) {
-         boolean retry;
+         boolean retry = true;
          int attempts = 0;
-         do {
+         while (retry && !isClosed()) {
             retry = false;
 
             TransportConfiguration tc = selectConnector();
@@ -780,31 +798,36 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
                factory = new ClientSessionFactoryImpl(this, tc, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);
                try {
                   addToConnecting(factory);
-                  factory.connect(initialConnectAttempts, failoverOnInitialConnection);
+                  // We always try to connect here with only one attempt,
+                  // as we will perform the initial retry here, looking for all possible connectors
+                  factory.connect(1, false);
                } finally {
                   removeFromConnecting(factory);
                }
             } catch (ActiveMQException e) {
-               factory.close();
-               if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
-                  attempts++;
+               try {
+                  if (e.getType() == ActiveMQExceptionType.NOT_CONNECTED) {
+                     attempts++;
 
-                  synchronized (topologyArrayGuard) {
+                     int connectorsSize = getConnectorsSize();
+                     int maxAttempts = initialConnectAttempts == 0 ? 1 : initialConnectAttempts;
 
-                     if (topologyArray != null && attempts == topologyArray.length) {
+                     if (initialConnectAttempts >= 0 && attempts >= maxAttempts * connectorsSize) {
                         throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
                      }
-                     if (topologyArray == null && attempts == this.getNumInitialConnectors()) {
+                     if (factory.waitForRetry(retryInterval)) {
                         throw ActiveMQClientMessageBundle.BUNDLE.cannotConnectToServers();
                      }
+                     retry = true;
+                  } else {
+                     throw e;
                   }
-                  retry = true;
-               } else {
-                  throw e;
+               } finally {
+
+                  factory.close();
                }
             }
          }
-         while (retry);
       }
 
       // ATM topology is never != null. Checking here just to be consistent with
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java
new file mode 100644
index 0000000..dc93db4
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/InitialConnectionTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.client;
+
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InitialConnectionTest extends ActiveMQTestBase {
+
+   @Test
+   public void testInitialInfinite() throws Exception {
+      AtomicInteger errors = new AtomicInteger(0);
+
+      ActiveMQServer server = createServer(false, true);
+
+      Thread t = new Thread() {
+         @Override
+         public void run() {
+            try {
+               Thread.sleep(500);
+               server.start();
+            } catch (Throwable e) {
+               e.printStackTrace();
+               errors.incrementAndGet();
+            }
+         }
+      };
+      t.start();
+      ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=-1&useTopologyForLoadBalancing=true");
+      connectionFactory.createConnection().close();
+      connectionFactory.close();
+
+
+      t.join();
+
+      Assert.assertEquals(0, errors.get());
+   }
+
+
+   @Test
+   public void testNegativeMaxTries() throws Exception {
+
+      long timeStart = System.currentTimeMillis();
+      ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("(tcp://localhost:61618,tcp://localhost:61616,tcp://localhost:61610)?ha=true&retryInterval=100&retryIntervalMultiplier=1.0&reconnectAttempts=-1&initialConnectAttempts=2&useTopologyForLoadBalancing=true");
+      boolean failed = false;
+      try {
+         connectionFactory.createConnection();
+      } catch (JMSException e) {
+         // expected
+         failed = true;
+      }
+      Assert.assertTrue(failed);
+      long timeEnd = System.currentTimeMillis();
+      Assert.assertTrue("3 connectors, at 100 milliseconds each try, initialConnectAttempt=2, it should have waited at least 600 (- 100 from the last try that we don't actually wait, just throw ) milliseconds", timeEnd - timeStart >= 500);
+   }
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
index 8d886cb..a19b79e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/ra/ActiveMQClusteredTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.ra.ActiveMQRASession;
 import org.apache.activemq.artemis.ra.ActiveMQResourceAdapter;
 import org.apache.activemq.artemis.ra.inflow.ActiveMQActivation;
 import org.apache.activemq.artemis.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.junit.Test;
 
 public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
@@ -193,6 +194,7 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
       spec.setMaxSession(CONSUMER_COUNT);
       spec.setSetupAttempts(5);
       spec.setSetupInterval(200L);
+      spec.setRetryInterval(100L);
       spec.setReconnectAttempts(reconnectAttempts);
       spec.setHA(true); // if this isn't true then the topology listener won't get nodeDown notifications
       spec.setCallTimeout(500L); // if this isn't set then it may take a long time for tearDown to occur on the MDB connection
@@ -220,32 +222,25 @@ public class ActiveMQClusteredTest extends ActiveMQRAClusteredTestBase {
       assertNotNull(endpoint.lastMessage);
       assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "test");
 
-      for (int i = 0; i < 10; i++) {
-         secondaryServer.stop();
-
-         long mark = System.currentTimeMillis();
-         long timeout = 5000;
-         while (primaryQueue.getConsumerCount() < CONSUMER_COUNT && (System.currentTimeMillis() - mark) < timeout) {
-            Thread.sleep(100);
-         }
+      try {
+         for (int i = 0; i < 10; i++) {
+            secondaryServer.stop();
 
-         assertTrue(primaryQueue.getConsumerCount() == CONSUMER_COUNT);
+            Wait.assertTrue(() -> primaryQueue.getConsumerCount() == CONSUMER_COUNT);
 
-         secondaryServer.start();
-         waitForServerToStart(secondaryServer);
-         secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
+            secondaryServer.start();
+            waitForServerToStart(secondaryServer);
+            secondaryQueue = secondaryServer.locateQueue(MDBQUEUEPREFIXEDSIMPLE);
 
-         mark = System.currentTimeMillis();
-         while (((primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount()) < (CONSUMER_COUNT) || primaryQueue.getConsumerCount() == CONSUMER_COUNT) && (System.currentTimeMillis() - mark) <= timeout) {
-            Thread.sleep(100);
+            Queue secondaryQueueRef = secondaryQueue;
+            Wait.assertTrue(() -> primaryQueue.getConsumerCount() <= CONSUMER_COUNT);
+            Wait.assertTrue(() -> secondaryQueueRef.getConsumerCount() <= CONSUMER_COUNT);
+            Wait.assertTrue(() -> primaryQueue.getConsumerCount() + secondaryQueueRef.getConsumerCount() == CONSUMER_COUNT);
          }
-
-         assertTrue(primaryQueue.getConsumerCount() < CONSUMER_COUNT);
-         assertTrue(secondaryQueue.getConsumerCount() < CONSUMER_COUNT);
-         assertTrue(primaryQueue.getConsumerCount() + secondaryQueue.getConsumerCount() == CONSUMER_COUNT);
+      } finally {
+         qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+         qResourceAdapter.stop();
       }
 
-      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
-      qResourceAdapter.stop();
    }
 }