You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/09/10 19:03:09 UTC

[activemq-artemis] branch master updated: ARTEMIS-2474 Retry interval ignored on Backup Connectors

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5373c60  ARTEMIS-2474 Retry interval ignored on Backup Connectors
     new 2a2b6bc  This closes #2826
5373c60 is described below

commit 5373c60891a8a45192bfbc869e81e5efe5e2019a
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Fri Sep 6 16:28:42 2019 -0400

    ARTEMIS-2474 Retry interval ignored on Backup Connectors
---
 .../artemis/core/server/cluster/BackupManager.java | 38 ++++++++---
 .../artemis/core/server/impl/InVMNodeManager.java  |  6 +-
 .../artemis/tests/util/ActiveMQTestBase.java       |  2 +-
 .../CheckRetryIntervalBackupManagerTest.java       | 78 ++++++++++++++++++++++
 .../integration/cluster/failover/FailoverTest.java | 24 +++----
 .../jms/cluster/JMSFailoverListenerTest.java       |  9 ++-
 6 files changed, 128 insertions(+), 29 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
index 835cc1f..5aef7ac 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/BackupManager.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
@@ -71,6 +72,12 @@ public class BackupManager implements ActiveMQComponent {
       this.clusterManager = clusterManager;
    }
 
+   /** This is meant for testing and assertions, please don't do anything stupid with it!
+    *  I mean, please don't use it outside of testing context */
+   public List<BackupConnector> getBackupConnectors() {
+      return backupConnectors;
+   }
+
    /*
    * Start the backup manager if not already started. This entails deploying a backup connector based on a cluster
    * configuration, informing the cluster manager so that it can add it to its topology and announce itself to the cluster.
@@ -174,12 +181,12 @@ public class BackupManager implements ActiveMQComponent {
    /*
    * A backup connector will connect to the cluster and announce that we are a backup server ready to fail over.
    * */
-   private abstract class BackupConnector {
+   public abstract class BackupConnector {
 
       private volatile ServerLocatorInternal backupServerLocator;
       private String name;
       private TransportConfiguration connector;
-      private long retryInterval;
+      protected long retryInterval;
       private ClusterManager clusterManager;
       private boolean stopping = false;
       private boolean announcingBackup;
@@ -200,6 +207,11 @@ public class BackupManager implements ActiveMQComponent {
       * */
       abstract ServerLocatorInternal createServerLocator(Topology topology);
 
+      /** This is for test assertions, please be careful, don't use outside of testing! */
+      public ServerLocator getBackupServerLocator() {
+         return backupServerLocator;
+      }
+
       /*
       * start the connector by creating the server locator to use.
       * */
@@ -261,13 +273,7 @@ public class BackupManager implements ActiveMQComponent {
                      return;
                   ActiveMQServerLogger.LOGGER.errorAnnouncingBackup(e);
 
-                  scheduledExecutor.schedule(new Runnable() {
-                     @Override
-                     public void run() {
-                        announceBackup();
-                     }
-
-                  }, retryInterval, TimeUnit.MILLISECONDS);
+                  retryConnection();
                } finally {
                   announcingBackup = false;
                }
@@ -275,6 +281,17 @@ public class BackupManager implements ActiveMQComponent {
          });
       }
 
+      /** it will re-schedule the connection after a timeout, using a scheduled executor */
+      protected void retryConnection() {
+         scheduledExecutor.schedule(new Runnable() {
+            @Override
+            public void run() {
+               announceBackup();
+            }
+
+         }, retryInterval, TimeUnit.MILLISECONDS);
+      }
+
       /*
       * called to notify the cluster manager about the backup
       * */
@@ -341,6 +358,7 @@ public class BackupManager implements ActiveMQComponent {
             }
             ServerLocatorImpl locator = new ServerLocatorImpl(topology, true, tcConfigs);
             locator.setClusterConnection(true);
+            locator.setRetryInterval(retryInterval);
             locator.setProtocolManagerFactory(ActiveMQServerSideProtocolManagerFactory.getInstance(locator));
             return locator;
          }
@@ -372,7 +390,7 @@ public class BackupManager implements ActiveMQComponent {
 
       @Override
       public ServerLocatorInternal createServerLocator(Topology topology) {
-         return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration);
+         return new ServerLocatorImpl(topology, true, discoveryGroupConfiguration).setRetryInterval(retryInterval);
       }
 
       @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
index a0e7601..7af25de 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java
@@ -70,17 +70,17 @@ public final class InVMNodeManager extends NodeManager {
    public void awaitLiveNode() throws Exception {
       do {
          while (state == NOT_STARTED) {
-            Thread.sleep(2000);
+            Thread.sleep(10);
          }
 
          liveLock.acquire();
 
          if (state == PAUSED) {
             liveLock.release();
-            Thread.sleep(2000);
+            Thread.sleep(10);
          } else if (state == FAILING_BACK) {
             liveLock.release();
-            Thread.sleep(2000);
+            Thread.sleep(10);
          } else if (state == LIVE) {
             break;
          }
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index 7538648..bf2af4b 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -586,7 +586,7 @@ public abstract class ActiveMQTestBase extends Assert {
       }
       ClusterConnectionConfiguration clusterConnectionConfiguration = new ClusterConnectionConfiguration().
          setName("cluster1").setAddress("jms").setConnectorName(connectorName).
-         setRetryInterval(1000).setDuplicateDetection(false).setMaxHops(1).
+         setRetryInterval(100).setDuplicateDetection(false).setMaxHops(1).
          setConfirmationWindowSize(1).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).
          setStaticConnectors(connectors0);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java
new file mode 100644
index 0000000..d533588
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/CheckRetryIntervalBackupManagerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.artemis.core.server.cluster.BackupManager;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CheckRetryIntervalBackupManagerTest extends FailoverTestBase {
+
+   private static final IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+
+   private volatile CountDownSessionFailureListener listener;
+
+   private volatile ClientSessionFactoryInternal sf;
+
+   private final Object lockFail = new Object();
+
+   @Override
+   protected void createConfigs() throws Exception {
+      nodeManager = createNodeManager();
+      TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+      TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+
+      backupConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(false)).setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).addConnectorConfiguration(liveConnector.getName(), liveConnector).addConnectorConfiguration(backupConnector.getName(), backupConnector).addClusterConfiguration(createBasicClusterConfig(backupConnector.getName(), liveConnector.getName()).setRetryInterval(333));
+
+      backupServer = createTestableServer(backupConfig);
+
+      liveConfig = super.createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)).setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration()).addClusterConfiguration(createBasicClusterConfig(liveConnector.getName()).setRetryInterval(333)).addConnectorConfiguration(liveConnector.getName(), liveConnector);
+
+      liveServer = createTestableServer(liveConfig);
+   }
+
+   @Test
+   public void testValidateRetryInterval() {
+      ActiveMQServerImpl server = (ActiveMQServerImpl) backupServer.getServer();
+      for (BackupManager.BackupConnector backupConnector : server.getBackupManager().getBackupConnectors()) {
+
+         Wait.assertTrue(() -> backupConnector.getBackupServerLocator() != null);
+         Assert.assertEquals(333, backupConnector.getBackupServerLocator().getRetryInterval());
+         Assert.assertEquals(-1, backupConnector.getBackupServerLocator().getReconnectAttempts());
+      }
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
+      return TransportConfigurationUtils.getInVMAcceptor(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
+      return TransportConfigurationUtils.getInVMConnector(live);
+   }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index 4c40448..5042afa 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -64,6 +64,7 @@ import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer
 import org.apache.activemq.artemis.tests.util.CountDownSessionFailureListener;
 import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
 import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.Wait;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -248,10 +249,10 @@ public class FailoverTest extends FailoverTestBase {
 
    @Test(timeout = 120000)
    public void testTimeoutOnFailoverConsumeBlocked() throws Exception {
-      locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
+      locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setConsumerWindowSize(0).setBlockOnDurableSend(true).setAckBatchSize(0).setBlockOnAcknowledge(true).setReconnectAttempts(-1).setAckBatchSize(0);
 
       if (nodeManager instanceof InVMNodeManager) {
-         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+         ((InVMNodeManager) nodeManager).failoverPause = 2000L;
       }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -343,10 +344,10 @@ public class FailoverTest extends FailoverTestBase {
    // https://issues.jboss.org/browse/HORNETQ-685
    @Test(timeout = 120000)
    public void testTimeoutOnFailoverTransactionCommit() throws Exception {
-      locator.setCallTimeout(5000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
+      locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
       if (nodeManager instanceof InVMNodeManager) {
-         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+         ((InVMNodeManager) nodeManager).failoverPause = 2000L;
       }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -415,7 +416,7 @@ public class FailoverTest extends FailoverTestBase {
       locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
 
       if (nodeManager instanceof InVMNodeManager) {
-         ((InVMNodeManager) nodeManager).failoverPause = 6000L;
+         ((InVMNodeManager) nodeManager).failoverPause = 2000L;
       }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -472,7 +473,7 @@ public class FailoverTest extends FailoverTestBase {
          expected.printStackTrace();
       }
 
-      Thread.sleep(2000);
+      Thread.sleep(1000);
 
       m = null;
       for (int i = 0; i < 500; i++) {
@@ -493,7 +494,7 @@ public class FailoverTest extends FailoverTestBase {
       locator.setCallTimeout(2000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(100);
 
       if (nodeManager instanceof InVMNodeManager) {
-         ((InVMNodeManager) nodeManager).failoverPause = 5000L;
+         ((InVMNodeManager) nodeManager).failoverPause = 1000L;
       }
 
       ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
@@ -533,7 +534,7 @@ public class FailoverTest extends FailoverTestBase {
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
       session.start();
 
-      ClientMessage m = consumer.receive(1000);
+      ClientMessage m = consumer.receiveImmediate();
       Assert.assertNull(m);
 
    }
@@ -708,10 +709,7 @@ public class FailoverTest extends FailoverTestBase {
       liveServer.getServer().start();
 
       Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
-      int i = 0;
-      while (!backupServer.isStarted() && i++ < 100) {
-         Thread.sleep(100);
-      }
+      Wait.assertTrue(backupServer::isStarted);
       liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
       Assert.assertTrue(backupServer.isStarted());
 
@@ -1807,7 +1805,7 @@ public class FailoverTest extends FailoverTestBase {
             message = repeatMessage;
             repeatMessage = null;
          } else {
-            message = consumer.receive(1000);
+            message = consumer.receive(50);
          }
 
          if (message != null) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
index c695391..4bd0b7c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/cluster/JMSFailoverListenerTest.java
@@ -98,9 +98,12 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
    public void testAutomaticFailover() throws Exception {
       ActiveMQConnectionFactory jbcf = ActiveMQJMSClient.createConnectionFactoryWithHA(JMSFactoryType.CF, livetc);
       jbcf.setReconnectAttempts(-1);
-      jbcf.setRetryInterval(10);
+      jbcf.setRetryInterval(100);
+      jbcf.setConnectionTTL(500);
+      jbcf.setClientFailureCheckPeriod(100);
       jbcf.setBlockOnDurableSend(true);
       jbcf.setBlockOnNonDurableSend(true);
+      jbcf.setCallTimeout(1000);
 
       // Note we set consumer window size to a value so we can verify that consumer credit re-sending
       // works properly on failover
@@ -152,7 +155,7 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
 
       JMSUtil.crash(liveServer, ((ActiveMQSession) sess).getCoreSession());
 
-      Assert.assertEquals(FailoverEventType.FAILURE_DETECTED, listener.get(0));
+      Wait.assertTrue(() -> FailoverEventType.FAILURE_DETECTED == listener.get(0));
       for (int i = 0; i < numMessages; i++) {
          JMSFailoverListenerTest.log.info("got message " + i);
 
@@ -178,12 +181,14 @@ public class JMSFailoverListenerTest extends ActiveMQTestBase {
 
       jbcfLive.setBlockOnNonDurableSend(true);
       jbcfLive.setBlockOnDurableSend(true);
+      jbcfLive.setCallTimeout(1000);
 
       ActiveMQConnectionFactory jbcfBackup = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, new TransportConfiguration(INVM_CONNECTOR_FACTORY, backupParams));
       jbcfBackup.setBlockOnNonDurableSend(true);
       jbcfBackup.setBlockOnDurableSend(true);
       jbcfBackup.setInitialConnectAttempts(-1);
       jbcfBackup.setReconnectAttempts(-1);
+      jbcfBackup.setRetryInterval(100);
 
       ActiveMQConnection connLive = (ActiveMQConnection) jbcfLive.createConnection();