You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "clebertsuconic (via GitHub)" <gi...@apache.org> on 2023/04/21 02:22:42 UTC

[GitHub] [activemq-artemis] clebertsuconic opened a new pull request, #4443: ARTEMIS-4247 Mirror and Clustering are not playing well together

clebertsuconic opened a new pull request, #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443

   - activemq.notifications are being transferred to the target node, unless an ignore is setup
   - topics are being duplicated after redistribution
   - topics sends are being duplicated when a 2 node cluster mirrors to another 2 node cluster, and both nodes are mirrored.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] jbertram merged pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "jbertram (via GitHub)" <gi...@apache.org>.
jbertram merged PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174943429


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   Ok, sounds like its fine then, just wanted to check, as its not obvious...very hard to tell stuff like that from the context if you dont already know it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174126520


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });

Review Comment:
   a1.locateQueue
   a2.locateQueue
   b1.locateQueue
   b2.locateQueue
   
   
   these calls are already doing that... they would return null if that didn't succeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1173271714


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });
+
+      Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a1TopicSubscription);
+      Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2TopicSubscription);
+      Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(b1TopicSubscription);
+      Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2);
+
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
+      Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
+
+      logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), b2TopicSubscription.getMessageCount());
+
+      Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numMessages; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createSharedDurableConsumer(topic, subscriptionName);
+            } else {
+               place = "A2";
+               consumer = sessionA2.createSharedDurableConsumer(topic, subscriptionName);
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      assertEmptyQueue(a1TopicSubscription);
+      assertEmptyQueue(a2TopicSubscription);
+      assertEmptyQueue(b1TopicSubscription);
+      assertEmptyQueue(b2TopicSubscription);
+   }
+
+   // This test is playing with Remote binding routing, similarly to how topic redistribution would happen
+   @Test
+   public void testRemoteBindingRouting() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
+         if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+
+      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+
+      Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
+      directMessage.setAddress(TOPIC_NAME);
+      directMessage.putStringProperty("Test", "t1");
+      remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
+      a2.getPostOffice().processRoute(directMessage, routingContext, false);
+      routingContext.getTransaction().commit();
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+         Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
+         logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
+         logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
+         logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
+         logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
+         Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+   }
+
+   // This test has distinct subscription on each node and it is making sure the Mirror Routing is working accurately
+   @Test
+   public void testMultiNodeSubscription() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 10; i < 20; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageProducer producer = session.createProducer(topic);
+         producer.send(session.createTextMessage("hello"));
+      }
+
+      Thread.sleep(5000);

Review Comment:
   oops... removing this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174061378


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   I only see this being used to set a toggle on RoutingContext? A 'MirrorOption' enum sitting at the root of the server package tree, would to me most typicaly suggest its about setting options for the Mirror itself. This isnt, its about controlling RoutingContext behaviour. MirrorOption is perhaps the last name I would give it from how I see it being used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174094495


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   I had it nestes at first. But then the syntax got better in other usages. 
   
   The way I see this is, it is controlling how Mirror happens after routing. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174142957


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   sending it empty wouldn't be wrong.
   
   The only scenario where this could happen is if the queue was removed in flight... 
   
   Test this will be a bit complicated though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1175831966


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");

Review Comment:
   the PR I sent is addressing this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174147176


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   I would need to test on an impossible situation. That is an artificial operation routing to a local non durable queue. 
   
   Clustered routings will always happen after durable queues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174062204


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -707,6 +708,10 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
 
       this.server = server;
 
+      if (queueConfiguration.isInternal()) {
+         this.internalQueue = queueConfiguration.isInternal();
+      }

Review Comment:
   The field is a boolean though?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174056084


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   So it should send the empty list if there are no durable queues? Or does that mean it shouldnt even send the message?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174968350


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java:
##########
@@ -577,6 +599,9 @@ private String debugBindings() {
    private void routeFromCluster(final Message message,
                                  final RoutingContext context,
                                  final byte[] ids) throws Exception {
+      if (!context.isMirrorDisabled()) {
+         context.setMirrorOption(MirrorOption.individualRoute);
+      }

Review Comment:
   The use of clear looks to be semi-optional in places, e.g. this method and ServerSessionImpl.send() only does it when the context 'isnt reusable'. Should setting this explicitly mark it 'not reusable' ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1175004814


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   Having it occupy the 'prime real estate' of the name MirrorOption at the very root of the code namepsace seems wrong given what its used for. Its not for holding general options for the mirror, its a fairly niche almost-internal routing control setting for use with RoutingContext.
   
   Even if were left it named MirrorOption, it still seems like it would be far nicer as a nested enum within RoutingContext, the only place it is used and seems likely to be used. None of the impl-only code using it would change really beyond just updating their import (probably even removing one for RoutingContextImpl actually)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174044880


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   non durable queues are not passed along on clustering. this is just for durable really.
   
   The Mirror Individual Route is to avoid sending a message to an entire fan out when clustering redistribution happens.
   
   Say you had a clustered message redistribution from one particular queue to another queue... which is a clustering operation. in that case we need to pass to the specific queues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174143682


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");
+
    // Capabilities
    public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
    public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
 
    public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
    public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
 
-   private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true));
+   private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));

Review Comment:
   This is actually what it is... controlling what's the outcome of Mirroring.. 
   in the case Routing will produce no mirroring. which is exactly what's being set here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#issuecomment-1521024403

   addressing comments at https://github.com/apache/activemq-artemis/pull/4445


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1173948572


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java:
##########
@@ -577,6 +599,9 @@ private String debugBindings() {
    private void routeFromCluster(final Message message,
                                  final RoutingContext context,
                                  final byte[] ids) throws Exception {
+      if (!context.isMirrorDisabled()) {
+         context.setMirrorOption(MirrorOption.individualRoute);
+      }

Review Comment:
   Why? Where is this reset?



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -357,6 +364,13 @@ private static void setProtocolData(MessageReference ref, String brokerID, long
             daMap.put(INTERNAL_DESTINATION, ref.getMessage().getAddress());
          }
       }
+
+      if (routingContext != null && routingContext.isMirrorIndividualRoute()) {
+         ArrayList<String> queues = new ArrayList<>();
+         routingContext.forEachDurable(q -> queues.add(String.valueOf(q.getName())));
+         daMap.put(TARGET_QUEUES, queues);
+      }

Review Comment:
   Why does it only look at durable queues?
   
   What if there are none? Should it actually be sending an empty list as it seems it will?
   
   (Also, what is a 'MirrorIndividualRoute'?)
   



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");

Review Comment:
   Last sentence of the javadoc (should that be a comment?) is a bit broken.
   
   Is this mirroring specific? Its defined in a mirroring class, and seemingly only referenced in others, suggesting so...if so, should it start "x-opt-amq-mr-" like all the other mirroring related key names.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   This can never realistically carry any other 'mirror options' so that name really doesnt seem to fit well. As the comment says, it just a routing control really. RoutingContextMirrorControl?
   



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);

Review Comment:
   All server names are "node_1"? What is the impact of that?



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1

Review Comment:
   typo,  propagated



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2

Review Comment:
   typo,  propagated



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });

Review Comment:
   This should verify+assert like the previous check above does that the forEach actually did something.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      assertEmptyQueue(a1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(a2.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b1.locateQueue(QUEUE_NAME));
+      assertEmptyQueue(b2.locateQueue(QUEUE_NAME));
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+   }
+
+   @Test
+   public void testTopicRedistributionAMQP() throws Exception {
+      internalTopicRedistribution("AMQP");
+   }
+
+   @Test
+   public void testTopicRedistributionCORE() throws Exception {
+      internalTopicRedistribution("CORE");
+   }
+
+   public void internalTopicRedistribution(String protocol) throws Exception {
+
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      final int numMessages = 100;
+
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         MessageConsumer consumer = session.createSharedDurableConsumer(topic, subscriptionName);
+         consumer.close();
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 2);
+
+      // naming convention is different between the protocols, I'm navigating through the bindings to find the actual queue name
+      String subscriptionQueueName;
+
+      {
+         HashSet<String> subscriptionSet = new HashSet<>();
+         // making sure the queues created on a1 are propaged into b1
+         a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+            logger.debug("{} = {}", n, b);
+            if (b instanceof LocalQueueBinding) {
+               QueueBinding qb = (QueueBinding) b;
+               subscriptionSet.add(qb.getUniqueName().toString());
+               Wait.assertTrue(() -> b1.locateQueue(qb.getUniqueName()) != null);
+            }
+         });
+         Assert.assertEquals(1, subscriptionSet.size());
+         subscriptionQueueName = subscriptionSet.iterator().next();
+      }
+
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((n, b) -> {
+         logger.debug("{} = {}", n, b);
+         if (b instanceof LocalQueueBinding) {
+            QueueBinding qb = (QueueBinding) b;
+            Wait.assertTrue(() -> b2.locateQueue(qb.getUniqueName()) != null);
+         }
+      });
+
+      Queue a1TopicSubscription = a1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a1TopicSubscription);
+      Queue a2TopicSubscription = a2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2TopicSubscription);
+      Queue b1TopicSubscription = b1.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(b1TopicSubscription);
+      Queue b2TopicSubscription = b2.locateQueue(subscriptionQueueName);
+      Assert.assertNotNull(a2);
+
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < numMessages; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      Assert.assertEquals(0, a1TopicSubscription.getConsumerCount());
+      Wait.assertEquals(numMessages / 2, a1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, a2TopicSubscription::getMessageCount);
+
+      logger.debug("b1={}. b2={}", b1TopicSubscription.getMessageCount(), b2TopicSubscription.getMessageCount());
+
+      Wait.assertEquals(numMessages / 2, b1TopicSubscription::getMessageCount);
+      Wait.assertEquals(numMessages / 2, b2TopicSubscription::getMessageCount);
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < numMessages; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createSharedDurableConsumer(topic, subscriptionName);
+            } else {
+               place = "A2";
+               consumer = sessionA2.createSharedDurableConsumer(topic, subscriptionName);
+            }
+            TextMessage message = (TextMessage) consumer.receive(5000);
+            Assert.assertNotNull(message);
+            logger.debug("Received message {} from {}", message, place);
+            consumer.close();
+         }
+      }
+
+      // if you see this message, most likely the notifications are being copied to the mirror
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222196"));
+      Assert.assertFalse(AssertionLoggerHandler.findText("AMQ224037"));
+
+      assertEmptyQueue(a1TopicSubscription);
+      assertEmptyQueue(a2TopicSubscription);
+      assertEmptyQueue(b1TopicSubscription);
+      assertEmptyQueue(b2TopicSubscription);
+   }
+
+   // This test is playing with Remote binding routing, similarly to how topic redistribution would happen
+   @Test
+   public void testRemoteBindingRouting() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propaged into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
+         if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+
+      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+
+      Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
+      directMessage.setAddress(TOPIC_NAME);
+      directMessage.putStringProperty("Test", "t1");
+      remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
+      a2.getPostOffice().processRoute(directMessage, routingContext, false);
+      routingContext.getTransaction().commit();
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+         Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
+         logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
+         logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
+         logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
+         logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
+         Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }

Review Comment:
   A comment explaining what this bit is doing might be in order. Took me a while to notice the 'individualRoute' option, and 'get(0).route' tricks........until then, reading the test it just looked like only 1 of 10 subscriptions got the message when all subscriptions would be expected to get it get it. Finally on closer inspection those tricks came out. Later, without the context of the wider commit, it will be harder to understand than it is now.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -707,6 +708,10 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
 
       this.server = server;
 
+      if (queueConfiguration.isInternal()) {
+         this.internalQueue = queueConfiguration.isInternal();
+      }

Review Comment:
   The if is superfluous, just set the value.



##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/RoutingContext.java:
##########
@@ -40,9 +41,15 @@ public interface RoutingContext {
 
    /** If the routing is from MirrorController, we don't redo mirrorController
     *  to avoid*/
+   MirrorOption getMirrorOption();

Review Comment:
   Existing comment above it is inaccurate now.



##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");
+
    // Capabilities
    public static final Symbol MIRROR_CAPABILITY = Symbol.getSymbol("amq.mirror");
    public static final Symbol QPID_DISPATCH_WAYPOINT_CAPABILITY = Symbol.valueOf("qd.waypoint");
 
    public static final SimpleString INTERNAL_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(INTERNAL_ID.toString());
    public static final SimpleString INTERNAL_BROKER_ID_EXTRA_PROPERTY = SimpleString.toSimpleString(BROKER_ID.toString());
 
-   private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorDisabled(true));
+   private static final ThreadLocal<RoutingContext> mirrorControlRouting = ThreadLocal.withInitial(() -> new RoutingContextImpl(null).setMirrorOption(MirrorOption.disabled));

Review Comment:
   o.a.a.a.core.server.MirrorOption seems like an odd name for this, when its not really a general option of the Mirror, but a specific behaviour control for the RoutingContext. A more routing-specific name would seem appropriate.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp.connect;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
+import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
+import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
+import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
+import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
+import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MirrorOption;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.cluster.impl.RemoteQueueBindingImpl;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
+import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
+import org.apache.activemq.artemis.tests.integration.amqp.AmqpTestSupport;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQPRedistributeClusterTest extends AmqpTestSupport {
+
+   private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   private static final String QUEUE_NAME = "REDIST_QUEUE";
+   private static final String TOPIC_NAME = "REDIST_TOPIC";
+   private static final SimpleString TOPIC_NAME_SIMPLE_STRING = SimpleString.toSimpleString("REDIST_TOPIC");
+
+   protected static final int A_1_PORT = 5673;
+   protected static final int A_2_PORT = 5674;
+
+   ActiveMQServer a1;
+   ActiveMQServer a2;
+
+   protected static final int B_1_PORT = 5773;
+   protected static final int B_2_PORT = 5774;
+
+   ActiveMQServer b1;
+   ActiveMQServer b2;
+
+   @Before
+   public void setCluster() throws Exception {
+      a1 = createClusteredServer("A_1", A_1_PORT, A_2_PORT, B_1_PORT);
+      a2 = createClusteredServer("A_2", A_2_PORT, A_1_PORT, B_2_PORT);
+
+      a1.start();
+      a2.start();
+
+      b1 = createClusteredServer("B_1", B_1_PORT, B_2_PORT, -1);
+      b2 = createClusteredServer("B_2", B_2_PORT, B_1_PORT, -1);
+
+      b1.start();
+      b2.start();
+   }
+
+   private ActiveMQServer createClusteredServer(String name, int thisPort, int clusterPort, int mirrorPort) throws Exception {
+      ActiveMQServer server = createServer(thisPort, false);
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST).addQueueConfig(new QueueConfiguration(QUEUE_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST)));
+      server.getConfiguration().addAddressConfiguration(new CoreAddressConfiguration().setName(TOPIC_NAME).addRoutingType(RoutingType.MULTICAST));
+      server.getConfiguration().clearAddressSettings();
+      server.getConfiguration().addAddressSetting("#", new AddressSettings().setRedistributionDelay(0));
+
+
+      server.setIdentity(name);
+      server.getConfiguration().setName("node_1").setHAPolicyConfiguration(new LiveOnlyPolicyConfiguration()).addConnectorConfiguration("thisNode", "tcp://localhost:" + thisPort).addConnectorConfiguration("otherNode", "tcp://localhost:" + clusterPort);
+
+      ClusterConnectionConfiguration clusterConfiguration = new ClusterConnectionConfiguration().setName("cluster").setConnectorName("thisNode").setMessageLoadBalancingType(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION).setStaticConnectors(Collections.singletonList("otherNode"));
+      server.getConfiguration().addClusterConfiguration(clusterConfiguration);
+
+      if (mirrorPort > 0) {
+         server.getConfiguration().addAMQPConnection(new AMQPBrokerConnectConfiguration("myMirror" + mirrorPort, "tcp://localhost:" + mirrorPort).setReconnectAttempts(-1).setRetryInterval(100).addConnectionElement(new AMQPMirrorBrokerConnectionElement().setDurable(true).setMirrorSNF(new SimpleString("$ACTIVEMQ_ARTEMIS_MIRROR_MirrorTowards_" + mirrorPort))));
+      }
+
+      return server;
+   }
+
+   @Test
+   public void testQueueRedistributionAMQP() throws Exception {
+      internalQueueRedistribution("AMQP");
+   }
+
+   @Test
+   public void testQueueRedistributionCORE() throws Exception {
+      internalQueueRedistribution("CORE");
+   }
+
+   public void internalQueueRedistribution(String protocol) throws Exception {
+      AssertionLoggerHandler.startCapture();
+      runAfter((AssertionLoggerHandler::stopCapture));
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
+         for (int i = 0; i < 100; i++) {
+            producer.send(session.createTextMessage("Hello" + i));
+         }
+      }
+
+      try (Connection connA1 = cfA1.createConnection();
+           Connection connA2 = cfA2.createConnection()) {
+
+         connA1.start();
+         connA2.start();
+
+         Session sessionA1 = connA1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session sessionA2 = connA2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+         for (int i = 0; i < 100; i++) {
+            MessageConsumer consumer;
+            String place;
+            if (i % 2 == 0) {
+               place = "A1";
+               consumer = sessionA1.createConsumer(sessionA1.createQueue(QUEUE_NAME));
+            } else {
+               place = "A2";
+               consumer = sessionA2.createConsumer(sessionA1.createQueue(QUEUE_NAME));

Review Comment:
   If actually creating distinct destination objects rather than reusing, should probably use the matching session for clarity (it mixes sessionA2 and sessionA1)



##########
tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java:
##########
@@ -464,6 +465,21 @@ public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBala
 
       }
 
+      @Override
+      public Binding getBinding(String name) {
+         return null;

Review Comment:
   even if its BindingsFake, seems weird not to implement this when the other methods are.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174066387


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   Even just nesting it in RoutingContext would seem nicer. RoutingContext.MirrorControl{enabled, disabled, individual}?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174964467


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");

Review Comment:
   Unresolving as both comments still seem applicable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174046217


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   this is for the Mirror option , there's an interface in the broker... it's either disable... I don't think we can give another name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174046495


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java:
##########
@@ -707,6 +708,10 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
 
       this.server = server;
 
+      if (queueConfiguration.isInternal()) {
+         this.internalQueue = queueConfiguration.isInternal();
+      }

Review Comment:
   not really... it will be null otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174130853


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MirrorOption.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server;
+
+/** This is to be used in conjunction with RoutingContext, where we control certain semantics during routing.
+ *  */
+public enum MirrorOption {

Review Comment:
   I prefer it as a separate class to be honest.
   
   the name Control to me implies it's doing control.. in this case it's just really an option on what is the outcome of Mirroring at the current Routing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174127464


##########
artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java:
##########
@@ -577,6 +599,9 @@ private String debugBindings() {
    private void routeFromCluster(final Message message,
                                  final RoutingContext context,
                                  final byte[] ids) throws Exception {
+      if (!context.isMirrorDisabled()) {
+         context.setMirrorOption(MirrorOption.individualRoute);
+      }

Review Comment:
   ServerSessionImpl will reuse the context over and over. If this individual is set, it must be reset before the next session.send



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] clebertsuconic commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "clebertsuconic (via GitHub)" <gi...@apache.org>.
clebertsuconic commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1174122204


##########
tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/WildcardAddressManagerUnitTest.java:
##########
@@ -464,6 +465,21 @@ public void setMessageLoadBalancingType(MessageLoadBalancingType messageLoadBala
 
       }
 
+      @Override
+      public Binding getBinding(String name) {
+         return null;

Review Comment:
   that's not really needed, but ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4443: ARTEMIS-4247 Inconsistencies between Broker Clustering and AMQP Mirror

Posted by "gemmellr (via GitHub)" <gi...@apache.org>.
gemmellr commented on code in PR #4443:
URL: https://github.com/apache/activemq-artemis/pull/4443#discussion_r1176363772


##########
artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java:
##########
@@ -76,14 +77,20 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
    public static final Symbol INTERNAL_ID = Symbol.getSymbol("x-opt-amq-mr-id");
    public static final Symbol INTERNAL_DESTINATION = Symbol.getSymbol("x-opt-amq-mr-dst");
 
+   /** When a clustered node (from regular cluster connections) receives a message
+       it will have target queues associated with it
+      this could be from message redistribution or simply load balancing.
+      an that case this will have the queue associated with it */
+   public static final Symbol TARGET_QUEUES = Symbol.getSymbol("x-opt-amq-trg-q");

Review Comment:
   Somehow I missed your other PR yesterday, even though I did look for a commit/PR first...unsure how I didnt see it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org