You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2023/04/21 03:35:00 UTC

[jira] [Work logged] (ARTEMIS-4247) Inconsistencies between AMQP Mirror and Artemis Clustering

     [ https://issues.apache.org/jira/browse/ARTEMIS-4247?focusedWorklogId=858354&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-858354 ]

ASF GitHub Bot logged work on ARTEMIS-4247:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Apr/23 03:34
            Start Date: 21/Apr/23 03:34
    Worklog Time Spent: 10m 
      Work Description: clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1173271714


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });
+
+      Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a1TopicSubscription);
+      Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2TopicSubscription);
+      Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(b1TopicSubscription);
+      Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2);
+
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
+      Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
+
+      logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), b2TopicSubscription.getMessageCount());
+
+      Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numMessages; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createSharedDurableConsumer(topic, subscriptionName);
+            } else {
+               place = "A2";
+               consumer = sessionA2.createSharedDurableConsumer(topic, subscriptionName);
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      assertEmptyQueue(a1TopicSubscription);
+      assertEmptyQueue(a2TopicSubscription);
+      assertEmptyQueue(b1TopicSubscription);
+      assertEmptyQueue(b2TopicSubscription);
+   }
+
+   // This test is playing with Remote binding routing, similarly to how topic redistribution would happen
+   @Test
+   public void testRemoteBindingRouting() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
+         if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+
+      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+
+      Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
+      directMessage.setAddress(TOPIC_NAME);
+      directMessage.putStringProperty("Test", "t1");
+      remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
+      a2.getPostOffice().processRoute(directMessage, routingContext, false);
+      routingContext.getTransaction().commit();
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+         Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
+         logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
+         logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
+         logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
+         logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
+         Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+   }
+
+   // This test has distinct subscription on each node and it is making sure the Mirror Routing is working accurately
+   @Test
+   public void testMultiNodeSubscription() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 10; i < 20; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageProducer producer = session.createProducer(topic);
+         producer.send(session.createTextMessage("hello"));
+      }
+
+      Thread.sleep(5000);

Review Comment:
   oops... removing this





Issue Time Tracking
-------------------

    Worklog Id:     (was: 858354)
    Time Spent: 20m  (was: 10m)

> Inconsistencies between AMQP Mirror and Artemis Clustering
> ----------------------------------------------------------
>
>                 Key: ARTEMIS-4247
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-4247
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Clebert Suconic
>            Priority: Major
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> - activemq.notifications are being transferred to the target node, unless an ignore is setup
> - topics are being duplicated after redistribution
> - topics sends are being duplicated when a 2 node cluster mirrors to another 2 node cluster, and both nodes are mirrored. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)