You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/12/15 22:06:51 UTC

[activemq-artemis] branch main updated: ARTEMIS-4091 - Make scaleDown target more deterministic

This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c2a0d744c ARTEMIS-4091 - Make scaleDown target more deterministic
     new bb05e04f69 This closes #4288
5c2a0d744c is described below

commit 5c2a0d744cc75988762a7269100a3de0729dca48
Author: AntonRoskvist <an...@volvo.com>
AuthorDate: Tue Nov 15 12:06:52 2022 +0100

    ARTEMIS-4091 - Make scaleDown target more deterministic
---
 .../core/remoting/impl/netty/NettyConnection.java  |   5 +-
 .../artemis/core/server/ActiveMQServerLogger.java  |   2 +-
 .../cluster/impl/ClusterConnectionBridge.java      |   4 +-
 .../core/server/impl/LiveOnlyActivation.java       |  13 +++
 .../integration/server/ScaleDownDeterminism.java   | 108 +++++++++++++++++++++
 5 files changed, 129 insertions(+), 3 deletions(-)

diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
index b12a57a6a1..3cb640162b 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java
@@ -41,6 +41,7 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
+import org.apache.activemq.artemis.utils.ConfigurationHelper;
 import org.apache.activemq.artemis.utils.Env;
 import org.apache.activemq.artemis.utils.IPV6Util;
 import org.slf4j.Logger;
@@ -436,7 +437,9 @@ public class NettyConnection implements Connection {
             continue;
          }
          if (NettyConnectorFactory.class.getName().equals(cfg.getFactoryClassName())) {
-            if (configuration.get(TransportConstants.PORT_PROP_NAME).equals(cfg.getParams().get(TransportConstants.PORT_PROP_NAME))) {
+            int port1 = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration);
+            int port2 = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, cfg.getParams());
+            if (port1 == port2) {
                //port same, check host
                Object hostParam = configuration.get(TransportConstants.HOST_PROP_NAME);
                if (hostParam != null) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
index 1feee23ec0..6bcc08a45c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java
@@ -646,7 +646,7 @@ public interface ActiveMQServerLogger {
    @LogMessage(id = 222111, value = "exception while invoking {} on {}", level = LogMessage.Level.TRACE)
    void managementOperationError(String op, String resourceName, Exception e);
 
-   @LogMessage(id = 222112, value = "exception while retrieving attribute {} on {}", level = LogMessage.Level.WARN)
+   @LogMessage(id = 222112, value = "exception while retrieving attribute {} on {}", level = LogMessage.Level.TRACE)
    void managementAttributeError(String att, String resourceName, Exception e);
 
    @LogMessage(id = 222113, value = "On ManagementService stop, there are {} unexpected registered MBeans: {}", level = LogMessage.Level.WARN)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index be596c4662..c34c76dbfb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.ActiveMQAddressDoesNotExistException;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.Message;
@@ -430,7 +431,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
             try {
                queue.deleteQueue(true);
                queue.removeAddress();
-
+            } catch (ActiveMQAddressDoesNotExistException e) {
+               // ignore
             } catch (Exception e) {
                logger.warn(e.getMessage(), e);
             }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
index 79c27d62ac..a49b51a8ce 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LiveOnlyActivation.java
@@ -149,7 +149,20 @@ public class LiveOnlyActivation extends Activation {
          // a timeout is necessary here in case we use a NamedLiveNodeLocatorForScaleDown and there's no matching node in the cluster
          // should the timeout be configurable?
          nodeLocator.locateNode(ActiveMQClient.DEFAULT_DISCOVERY_INITIAL_WAIT_TIMEOUT);
+
          ClientSessionFactoryInternal clientSessionFactory = null;
+         if (nodeLocator instanceof AnyLiveNodeLocatorForScaleDown && scaleDownPolicy.getConnectors() != null) {
+            try {
+               clientSessionFactory = scaleDownServerLocator.connect();
+            } catch (Exception e) {
+               logger.trace("Failed to connect to {}", scaleDownPolicy.getConnectors().get(0));
+               if (clientSessionFactory != null) {
+                  clientSessionFactory.close();
+               }
+               clientSessionFactory = null;
+            }
+         }
+
          while (clientSessionFactory == null) {
             Pair<TransportConfiguration, TransportConfiguration> possibleLive = null;
             possibleLive = nodeLocator.getLiveConfiguration();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDeterminism.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDeterminism.java
new file mode 100644
index 0000000000..16070b3f1e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/ScaleDownDeterminism.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.server;
+
+import java.lang.invoke.MethodHandles;
+
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScaleDownDeterminism extends ClusterTestBase {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+      setupLiveServer(0, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
+      servers[0].getConfiguration().setSecurityEnabled(true);
+      setupLiveServer(1, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
+      servers[1].getConfiguration().setSecurityEnabled(true);
+      setupLiveServer(2, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
+      servers[2].getConfiguration().setSecurityEnabled(true);
+
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1, 2);
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster0", "testAddress", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 2, 0, 1);
+
+      startServers(0, 1, 2);
+      setupSessionFactory(0, isNetty(), false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
+      setupSessionFactory(1, isNetty(), false, servers[1].getConfiguration().getClusterUser(), servers[1].getConfiguration().getClusterPassword());
+      setupSessionFactory(2, isNetty(), false, servers[2].getConfiguration().getClusterUser(), servers[2].getConfiguration().getClusterPassword());
+      logger.debug("===============================");
+      logger.debug("Node 0: {}", servers[0].getClusterManager().getNodeId());
+      logger.debug("Node 1: {}", servers[1].getClusterManager().getNodeId());
+      logger.debug("Node 2: {}", servers[2].getClusterManager().getNodeId());
+      logger.debug("===============================");
+
+      servers[0].setIdentity("Node0");
+      servers[1].setIdentity("Node1");
+      servers[2].setIdentity("Node2");
+   }
+
+   protected boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testScaleDownDeterministically() throws Exception {
+      final String queueName = "testQueue";
+      final int messageCount = 10;
+
+      ClientSession session = sfs[0].createSession(servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword(), false, true, false, false, 0);
+      createQueue(0, queueName, queueName, null, false, servers[0].getConfiguration().getClusterUser(), servers[0].getConfiguration().getClusterPassword());
+      ClientProducer producer = session.createProducer(queueName);
+
+      for (int i = 0; i < messageCount; i++) {
+         producer.send(session.createMessage(false));
+      }
+      session.close();
+      sfs[0].close();
+
+      servers[0].getActiveMQServerControl().addConnector("scaleDown", "tcp://localhost:61617");
+      //Connectors set up in test do ot use the host param whicvh is needed for above command
+      //Removing host param so that cluster connector matches new scaleDown connector
+      servers[0].getConfiguration().getConnectorConfigurations().get("scaleDown").getParams().remove("host");
+
+      String server0connector2 = servers[0].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors().get(1);
+      String server1connector1 = servers[1].getConfiguration().getClusterConfigurations().iterator().next().getStaticConnectors().get(0);
+
+      servers[0].getActiveMQServerControl().scaleDown("scaleDown");
+      Assert.assertEquals(messageCount, servers[1].getTotalMessageCount());
+      servers[0].start();
+
+      waitForServerToStart(servers[0]);
+      Assert.assertEquals(0, servers[0].getTotalMessageCount());
+
+      servers[1].getActiveMQServerControl().scaleDown(server1connector1);
+      Assert.assertEquals(messageCount, servers[0].getTotalMessageCount());
+      servers[1].start();
+
+      waitForServerToStart(servers[1]);
+      servers[0].getActiveMQServerControl().scaleDown(server0connector2);
+      Assert.assertEquals(messageCount, servers[2].getTotalMessageCount());
+   }
+}