You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@activemq.apache.org by "gemmellr (via GitHub)" <gi...@apache.org> on 2023/04/25 11:44:48 UTC

[GitHub] [activemq-artemis] gemmellr commented on a diff in pull request #4445: ARTEMIS-4247 BrokerClustering vs Mirror code improvements

gemmellr commented on code in PR #4445:
URL: https://github.com/apache/activemq-artemis/pull/4445#discussion_r1176379290


##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -356,28 +366,140 @@ public void testRemoteBindingRouting() throws Exception {
 
       Assert.assertEquals(1, remoteQueueBindings_a2.size());
 
-      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(RoutingContext.MirrorOption.individualRoute);
 
       Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
       directMessage.setAddress(TOPIC_NAME);
       directMessage.putStringProperty("Test", "t1");
+
+      // we will route a single message to subscription-0. a previous search found the RemoteBinding into remoteQueueBindins_a2;
       remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
       a2.getPostOffice().processRoute(directMessage, routingContext, false);
       routingContext.getTransaction().commit();
 
       for (int i = 0; i < 10; i++) {
          String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
+            logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
+            logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
+            logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
+         }
+
+         // Since we routed to subscription-0 only, the outcome mirroring should only receive the output on subscription-0 on b1;
+         // When the routing happens after a clustered operation, mirror should be done individually to each routed queue.
+         // this test is validating that only subscription-0 got the message on both a1 and b1;
+         // notice that the initial route happened on a2, which then transfered the message towards a1.
+         // a1 made the copy to b1 through mirroring, and only subscription-0 should receive a message.
+         // which is exactly what should happen through message-redistribution in clustering
+
          Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
-         logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
-         logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
-         logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
-         logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
          Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
       }
    }
 
+
+
+   // This test is faking a MirrorSend.
+   // First it will send with an empty collection, then to a single queue
+   @Test
+   public void testFakeMirrorSend() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propagated into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
+         if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+      AmqpConnection connection = createAmqpConnection(new URI("tcp://localhost:" + A_1_PORT));
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), new ArrayList<>());
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), a1.getStorageManager().generateID());
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(b1.getNodeID()));
+
+      AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new Symbol[]{Symbol.getSymbol("amq.mirror")});
+      sender.send(message);
+
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         // all queues should be empty
+         Wait.assertEquals(0, a1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b1.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
+         Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
+      }
+
+      message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      ArrayList<String> singleQueue = new ArrayList<>();
+      singleQueue.add("my-topic-shared-subscription_3:global");
+      singleQueue.add("IDONTEXIST");
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), singleQueue);
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), a1.getStorageManager().generateID());
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(b1.getNodeID())); // simulating a node from b1, so it is not sent bak to b1

Review Comment:
   bak = back



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -356,28 +366,140 @@ public void testRemoteBindingRouting() throws Exception {
 
       Assert.assertEquals(1, remoteQueueBindings_a2.size());
 
-      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(MirrorOption.individualRoute);
+      RoutingContext routingContext = new RoutingContextImpl(new TransactionImpl(a2.getStorageManager())).setMirrorOption(RoutingContext.MirrorOption.individualRoute);
 
       Message directMessage = new CoreMessage(a2.getStorageManager().generateID(), 512);
       directMessage.setAddress(TOPIC_NAME);
       directMessage.putStringProperty("Test", "t1");
+
+      // we will route a single message to subscription-0. a previous search found the RemoteBinding into remoteQueueBindins_a2;
       remoteQueueBindings_a2.get(0).route(directMessage, routingContext);
       a2.getPostOffice().processRoute(directMessage, routingContext, false);
       routingContext.getTransaction().commit();
 
       for (int i = 0; i < 10; i++) {
          String name = "my-topic-shared-subscription_" + i + ":global";
+
+         if (logger.isDebugEnabled()) {
+            logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
+            logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
+            logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
+            logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
+         }
+
+         // Since we routed to subscription-0 only, the outcome mirroring should only receive the output on subscription-0 on b1;
+         // When the routing happens after a clustered operation, mirror should be done individually to each routed queue.
+         // this test is validating that only subscription-0 got the message on both a1 and b1;
+         // notice that the initial route happened on a2, which then transfered the message towards a1.
+         // a1 made the copy to b1 through mirroring, and only subscription-0 should receive a message.
+         // which is exactly what should happen through message-redistribution in clustering
+
          Wait.assertEquals(i == 0 ? 1 : 0, a1.locateQueue(name)::getMessageCount);
-         logger.debug("a1 queue {} with {} messages", name, a1.locateQueue(name).getMessageCount());
-         logger.debug("b1 queue {} with {} messages", name, b1.locateQueue(name).getMessageCount());
-         logger.debug("a2 queue {} with {} messages", name, a2.locateQueue(name).getMessageCount());
-         logger.debug("b2 queue {} with {} messages", name, b2.locateQueue(name).getMessageCount());
          Wait.assertEquals(i == 0 ? 1 : 0, b1.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, a2.locateQueue(name)::getMessageCount);
          Wait.assertEquals(0, b2.locateQueue(name)::getMessageCount);
       }
    }
 
+
+
+   // This test is faking a MirrorSend.
+   // First it will send with an empty collection, then to a single queue
+   @Test
+   public void testFakeMirrorSend() throws Exception {
+      final String protocol = "AMQP";
+      String subscriptionName = "my-topic-shared-subscription";
+
+      ConnectionFactory cfA1 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_1_PORT);
+      ConnectionFactory cfA2 = CFUtil.createConnectionFactory(protocol, "tcp://localhost:" + A_2_PORT);
+
+      Topic topic;
+
+      try (Connection conn = cfA1.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      try (Connection conn = cfA2.createConnection()) {
+         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         topic = session.createTopic(TOPIC_NAME);
+         for (int i = 0; i < 10; i++) {
+            session.createSharedDurableConsumer(topic, subscriptionName + "_" + i);
+         }
+      }
+
+      Wait.assertTrue(() -> a1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b1.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+      Wait.assertTrue(() -> b2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).size() == 20);
+
+      List<RemoteQueueBinding> remoteQueueBindings_a2 = new ArrayList<>();
+      // making sure the queues created on a2 are propagated into b2
+      a2.getPostOffice().getBindingsForAddress(TOPIC_NAME_SIMPLE_STRING).forEach((a, b) -> {
+         if (b instanceof RemoteQueueBindingImpl && b.getClusterName().toString().startsWith(subscriptionName + "_0")) {
+            logger.debug("{} = {}", a, b);
+            remoteQueueBindings_a2.add((RemoteQueueBinding) b);
+         }
+      });
+
+      Assert.assertEquals(1, remoteQueueBindings_a2.size());
+      AmqpConnection connection = createAmqpConnection(new URI("tcp://localhost:" + A_1_PORT));
+      runAfter(connection::close);
+      AmqpSession session = connection.createSession();
+
+      AmqpMessage message = new AmqpMessage();
+      message.setAddress(TOPIC_NAME);
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.TARGET_QUEUES.toString(), new ArrayList<>());
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.INTERNAL_ID.toString(), a1.getStorageManager().generateID());
+      message.setDeliveryAnnotation(AMQPMirrorControllerSource.BROKER_ID.toString(), String.valueOf(b1.getNodeID()));
+
+      AmqpSender sender = session.createSender(mirrorName(A_1_PORT), new Symbol[]{Symbol.getSymbol("amq.mirror")});
+      sender.send(message);
+
+
+      for (int i = 0; i < 10; i++) {
+         String name = "my-topic-shared-subscription_" + i + ":global";
+
+         // all queues should be empty

Review Comment:
   make it easy for later, specify why as well, ...'because the target queue list was empty'



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java:
##########
@@ -37,35 +36,45 @@ public ActiveMQServerControlUsingCoreTest(boolean legacyCreateQueue) {
    }
 
    @Override
-   @Test
+   @Ignore
    public void testListProducersAgainstServer() throws Exception {
-      // have to disable this test, as it's dealing with producers objects.
-      // the test itself will be using a producer to manage the server.
-      // so the test will include noise and it might fail occasionally
-      Assume.assumeTrue(false);
    }
-   // ActiveMQServerControlTest overrides --------------------------
+
+
+   @Ignore
+   @Override
+   public void testListSessions() throws Exception {
+      // similarly to testListProducersAgainstServer test,
+      // this test will have different objecgs created when running over core,
+      // what may introduce noise to the test

Review Comment:
   objecgs = objects
   
   Its not clear what 'noise' is being referenced or why it matters...the description you removed for the other test was actually clearer, that the test creates producers and these interfere with inspecting producers.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPRedistributeClusterTest.java:
##########
@@ -42,7 +44,6 @@
 import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.MirrorOption;

Review Comment:
   This is fine, though if you had changed it import org.apache.activemq.artemis.core.server.RoutingContext.MirrorOption instead then you wouldnt have to update all the using code to doing e.g "RoutingContext.MirrorOption.individualRoute", it would have worked as it was before.



##########
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java:
##########
@@ -37,35 +36,45 @@ public ActiveMQServerControlUsingCoreTest(boolean legacyCreateQueue) {
    }
 
    @Override
-   @Test
+   @Ignore
    public void testListProducersAgainstServer() throws Exception {
-      // have to disable this test, as it's dealing with producers objects.
-      // the test itself will be using a producer to manage the server.
-      // so the test will include noise and it might fail occasionally
-      Assume.assumeTrue(false);
    }
-   // ActiveMQServerControlTest overrides --------------------------
+
+
+   @Ignore
+   @Override
+   public void testListSessions() throws Exception {
+      // similarly to testListProducersAgainstServer test,

Review Comment:
   Maybe explain it in the first test testListProducersAgainstServer as it was originally, and then reference that from this one with just this first sentence...rather than reference the earlier test when it would now have no explanation why it is ignored after these changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@activemq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org