You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/08/19 20:42:07 UTC

[activemq-artemis] branch main updated: ARTEMIS-3918 support FQQN + anycast + redistribution

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 86db53da9a ARTEMIS-3918 support FQQN + anycast + redistribution
86db53da9a is described below

commit 86db53da9acb96ed81b38fe3962aa0532047898a
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jul 29 16:12:53 2022 -0500

    ARTEMIS-3918 support FQQN + anycast + redistribution
    
    When a message is sent to an anycast queue via FQQN on one node of a
    cluster and then a consumer is created on that same anycast queue via
    FQQN on another node in the cluster the message is not redistributed to
    the node with the consumer.
    
    This commit fixes this use-case primarily by including the FQQN info in
    the notification messages sent to other nodes in the cluster.
---
 .../artemis/core/postoffice/impl/BindingsImpl.java |   2 +-
 .../core/postoffice/impl/PostOfficeImpl.java       |   3 +-
 .../server/cluster/impl/ClusterConnectionImpl.java |   5 +-
 .../core/server/impl/ServerSessionImpl.java        |   2 +-
 .../distribution/MessageRedistributionTest.java    | 119 +++++++++++++++++++--
 5 files changed, 120 insertions(+), 11 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 13c95ddcc0..6272971592 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -192,7 +192,7 @@ public final class BindingsImpl implements Bindings {
          logger.tracef("Redistributing message %s", message);
       }
 
-      final SimpleString routingName = originatingQueue.getName();
+      final SimpleString routingName = CompositeAddress.isFullyQualified(message.getAddress()) && originatingQueue.getRoutingType() == RoutingType.ANYCAST ? CompositeAddress.extractAddressName(message.getAddressSimpleString()) : originatingQueue.getName();
 
       final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 6beb348023..bef63159ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -415,7 +415,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                      return;
                   }
 
-                  Binding binding = getBinding(queueName);
+                  SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
+                  Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
 
                   if (binding != null) {
                      // We have a local queue
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 902071652e..60d3a9eaf7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.group.impl.Response;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.apache.activemq.artemis.utils.ExecutorFactory;
 import org.apache.activemq.artemis.utils.FutureLatch;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -1426,7 +1427,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
          // Need to propagate the consumer add
          TypedProperties props = new TypedProperties();
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+         SimpleString addressName = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
+
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(addressName) ? addressName : binding.getAddress());
 
          props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5057d362e1..a446d93aea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -580,7 +580,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
       if (!browseOnly) {
          TypedProperties props = new TypedProperties();
 
-         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
+         props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(unPrefixedQueueName) ? unPrefixedQueueName : address);
 
          props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 54d3ab8367..10bf026796 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -16,6 +16,12 @@
  */
 package org.apache.activemq.artemis.tests.integration.cluster.distribution;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.transaction.xa.XAResource;
 import javax.transaction.xa.Xid;
 import java.nio.ByteBuffer;
@@ -30,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientProducer;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
-
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Bindable;
 import org.apache.activemq.artemis.core.server.Queue;
@@ -40,7 +46,9 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
 import org.apache.activemq.artemis.core.server.impl.QueueImpl;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.jboss.logging.Logger;
 import org.junit.Assert;
 import org.junit.Before;
@@ -68,8 +76,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
       return false;
    }
 
-
-
    @Override
    protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) {
       super.setSessionFactoryCreateLocator(node, ha, serverTotc);
@@ -78,7 +84,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
 
    }
 
-
    //https://issues.jboss.org/browse/HORNETQ-1061
    @Test
    public void testRedistributionWithMessageGroups() throws Exception {
@@ -728,6 +733,102 @@ public class MessageRedistributionTest extends ClusterTestBase {
       verifyNotReceive(1);
    }
 
+   @Test
+   public void testRedistributionWithFqqnAnycast() throws Exception {
+      internalTestRedistributionWithFqqn(RoutingType.ANYCAST);
+   }
+
+   @Test
+   public void testRedistributionWithFqqnMulticast() throws Exception {
+      internalTestRedistributionWithFqqn(RoutingType.MULTICAST);
+   }
+
+   private void internalTestRedistributionWithFqqn(RoutingType routingType) throws Exception {
+      final String ADDRESS = "myAddress";
+      final String QUEUE = "myQueue";
+      final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
+      AddressSettings as = new AddressSettings().setRedistributionDelay(0);
+      getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
+      getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
+      setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      createQueue(0, ADDRESS, QUEUE, null, false, routingType);
+      createQueue(0, ADDRESS, "extra", null, false, routingType);
+      waitForBindings(0, ADDRESS, 2, 0, true);
+      waitForBindings(1, ADDRESS, 2, 0, false);
+
+      send(0, FQQN, 20, false, null, routingType, null);
+
+      setupSessionFactory(1, isNetty());
+      createQueue(1, ADDRESS, QUEUE, null, false, routingType);
+      waitForBindings(0, ADDRESS, 1, 0, false);
+      waitForBindings(1, ADDRESS, 1, 0, true);
+
+      addConsumer(1, 1, FQQN, null);
+      waitForBindings(1, ADDRESS, 1, 1, true);
+
+      verifyReceiveAll(20, 1);
+      verifyNotReceive(1);
+   }
+
+   @Test
+   public void testRedistributionWithFqqnJmsQueue() throws Exception {
+      final String ADDRESS = "myAddress";
+      final String QUEUE = "myQueue";
+      final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
+
+      AddressSettings as = new AddressSettings().setRedistributionDelay(0);
+      getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
+      getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
+      setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
+      getServer(0).getConfiguration().setName("0");
+      getServer(1).getConfiguration().setName("1");
+      startServers(0, 1);
+
+      ConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://0");
+      ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://1");
+
+      try (Connection connection0 = cf0.createConnection();
+           Connection connection1 = cf1.createConnection()) {
+         javax.jms.Queue sendTo = ActiveMQJMSClient.createQueue(FQQN);
+         javax.jms.Queue consumeFrom = ActiveMQJMSClient.createQueue(FQQN);
+
+         setupSessionFactory(0, isNetty());
+         createQueue(0, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
+         waitForBindings(0, ADDRESS, 1, 0, true);
+         waitForBindings(1, ADDRESS, 1, 0, false);
+
+         Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = session0.createProducer(sendTo);
+
+         final int numMessages = 10;
+         for (int i = 0; i < numMessages; i++) {
+            TextMessage message = session0.createTextMessage("This is text message " + i);
+            producer.send(message);
+         }
+         producer.close();
+         assertEquals(numMessages, servers[0].locateQueue(QUEUE).getMessageCount());
+
+         setupSessionFactory(1, isNetty());
+         createQueue(1, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
+         waitForBindings(1, ADDRESS, 1, 0, true);
+         waitForBindings(0, ADDRESS, 1, 0, false);
+
+         Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         connection1.start();
+         MessageConsumer consumer1 = session1.createConsumer(consumeFrom);
+         waitForBindings(1, ADDRESS, 1, 1, true);
+         javax.jms.Message message1;
+         for (int i = 0; i < numMessages; i++) {
+            message1 = consumer1.receive(5000);
+            assertNotNull(message1);
+         }
+      }
+   }
+
    @Test
    public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception {
       setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
@@ -1293,11 +1394,15 @@ public class MessageRedistributionTest extends ClusterTestBase {
    }
 
    protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
-      setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
+      setupCluster("queues", messageLoadBalancingType);
+   }
+
+   protected void setupCluster(final String address, final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+      setupClusterConnection("cluster0", address, messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
 
-      setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
+      setupClusterConnection("cluster1", address, messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
 
-      setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
+      setupClusterConnection("cluster2", address, messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
    }
 
    protected void setRedistributionDelay(final long delay) {