You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/08/19 20:42:07 UTC
[activemq-artemis] branch main updated: ARTEMIS-3918 support FQQN + anycast + redistribution
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 86db53da9a ARTEMIS-3918 support FQQN + anycast + redistribution
86db53da9a is described below
commit 86db53da9acb96ed81b38fe3962aa0532047898a
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Fri Jul 29 16:12:53 2022 -0500
ARTEMIS-3918 support FQQN + anycast + redistribution
When a message is sent to an anycast queue via FQQN on one node of a
cluster and then a consumer is created on that same anycast queue via
FQQN on another node in the cluster the message is not redistributed to
the node with the consumer.
This commit fixes this use-case primarily by including the FQQN info in
the notification messages sent to other nodes in the cluster.
---
.../artemis/core/postoffice/impl/BindingsImpl.java | 2 +-
.../core/postoffice/impl/PostOfficeImpl.java | 3 +-
.../server/cluster/impl/ClusterConnectionImpl.java | 5 +-
.../core/server/impl/ServerSessionImpl.java | 2 +-
.../distribution/MessageRedistributionTest.java | 119 +++++++++++++++++++--
5 files changed, 120 insertions(+), 11 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
index 13c95ddcc0..6272971592 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/BindingsImpl.java
@@ -192,7 +192,7 @@ public final class BindingsImpl implements Bindings {
logger.tracef("Redistributing message %s", message);
}
- final SimpleString routingName = originatingQueue.getName();
+ final SimpleString routingName = CompositeAddress.isFullyQualified(message.getAddress()) && originatingQueue.getRoutingType() == RoutingType.ANYCAST ? CompositeAddress.extractAddressName(message.getAddressSimpleString()) : originatingQueue.getName();
final Pair<Binding[], CopyOnWriteBindings.BindingIndex> bindingsAndPosition = routingNameBindingMap.getBindings(routingName);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 6beb348023..bef63159ed 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -415,7 +415,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
return;
}
- Binding binding = getBinding(queueName);
+ SimpleString addressName = props.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
+ Binding binding = getBinding(CompositeAddress.isFullyQualified(addressName) ? addressName : queueName);
if (binding != null) {
// We have a local queue
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 902071652e..60d3a9eaf7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -71,6 +71,7 @@ import org.apache.activemq.artemis.core.server.group.impl.Response;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -1426,7 +1427,9 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
// Need to propagate the consumer add
TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+ SimpleString addressName = message.getSimpleStringProperty(ManagementHelper.HDR_ADDRESS);
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(addressName) ? addressName : binding.getAddress());
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 5057d362e1..a446d93aea 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -580,7 +580,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (!browseOnly) {
TypedProperties props = new TypedProperties();
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, address);
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, CompositeAddress.isFullyQualified(unPrefixedQueueName) ? unPrefixedQueueName : address);
props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
index 54d3ab8367..10bf026796 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/MessageRedistributionTest.java
@@ -16,6 +16,12 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster.distribution;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.nio.ByteBuffer;
@@ -30,7 +36,7 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
-
+import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Bindable;
import org.apache.activemq.artemis.core.server.Queue;
@@ -40,7 +46,9 @@ import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfigu
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.CompositeAddress;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
@@ -68,8 +76,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
return false;
}
-
-
@Override
protected void setSessionFactoryCreateLocator(int node, boolean ha, TransportConfiguration serverTotc) {
super.setSessionFactoryCreateLocator(node, ha, serverTotc);
@@ -78,7 +84,6 @@ public class MessageRedistributionTest extends ClusterTestBase {
}
-
//https://issues.jboss.org/browse/HORNETQ-1061
@Test
public void testRedistributionWithMessageGroups() throws Exception {
@@ -728,6 +733,102 @@ public class MessageRedistributionTest extends ClusterTestBase {
verifyNotReceive(1);
}
+ @Test
+ public void testRedistributionWithFqqnAnycast() throws Exception {
+ internalTestRedistributionWithFqqn(RoutingType.ANYCAST);
+ }
+
+ @Test
+ public void testRedistributionWithFqqnMulticast() throws Exception {
+ internalTestRedistributionWithFqqn(RoutingType.MULTICAST);
+ }
+
+ private void internalTestRedistributionWithFqqn(RoutingType routingType) throws Exception {
+ final String ADDRESS = "myAddress";
+ final String QUEUE = "myQueue";
+ final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
+ AddressSettings as = new AddressSettings().setRedistributionDelay(0);
+ getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
+ getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
+ setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ createQueue(0, ADDRESS, QUEUE, null, false, routingType);
+ createQueue(0, ADDRESS, "extra", null, false, routingType);
+ waitForBindings(0, ADDRESS, 2, 0, true);
+ waitForBindings(1, ADDRESS, 2, 0, false);
+
+ send(0, FQQN, 20, false, null, routingType, null);
+
+ setupSessionFactory(1, isNetty());
+ createQueue(1, ADDRESS, QUEUE, null, false, routingType);
+ waitForBindings(0, ADDRESS, 1, 0, false);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+
+ addConsumer(1, 1, FQQN, null);
+ waitForBindings(1, ADDRESS, 1, 1, true);
+
+ verifyReceiveAll(20, 1);
+ verifyNotReceive(1);
+ }
+
+ @Test
+ public void testRedistributionWithFqqnJmsQueue() throws Exception {
+ final String ADDRESS = "myAddress";
+ final String QUEUE = "myQueue";
+ final String FQQN = CompositeAddress.toFullyQualified(ADDRESS, QUEUE);
+
+ AddressSettings as = new AddressSettings().setRedistributionDelay(0);
+ getServer(0).getAddressSettingsRepository().addMatch(ADDRESS, as);
+ getServer(1).getAddressSettingsRepository().addMatch(ADDRESS, as);
+ setupCluster(ADDRESS, MessageLoadBalancingType.ON_DEMAND);
+ getServer(0).getConfiguration().setName("0");
+ getServer(1).getConfiguration().setName("1");
+ startServers(0, 1);
+
+ ConnectionFactory cf0 = new ActiveMQConnectionFactory("vm://0");
+ ConnectionFactory cf1 = new ActiveMQConnectionFactory("vm://1");
+
+ try (Connection connection0 = cf0.createConnection();
+ Connection connection1 = cf1.createConnection()) {
+ javax.jms.Queue sendTo = ActiveMQJMSClient.createQueue(FQQN);
+ javax.jms.Queue consumeFrom = ActiveMQJMSClient.createQueue(FQQN);
+
+ setupSessionFactory(0, isNetty());
+ createQueue(0, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
+ waitForBindings(0, ADDRESS, 1, 0, true);
+ waitForBindings(1, ADDRESS, 1, 0, false);
+
+ Session session0 = connection0.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session0.createProducer(sendTo);
+
+ final int numMessages = 10;
+ for (int i = 0; i < numMessages; i++) {
+ TextMessage message = session0.createTextMessage("This is text message " + i);
+ producer.send(message);
+ }
+ producer.close();
+ assertEquals(numMessages, servers[0].locateQueue(QUEUE).getMessageCount());
+
+ setupSessionFactory(1, isNetty());
+ createQueue(1, ADDRESS, QUEUE, null, false, RoutingType.ANYCAST);
+ waitForBindings(1, ADDRESS, 1, 0, true);
+ waitForBindings(0, ADDRESS, 1, 0, false);
+
+ Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ connection1.start();
+ MessageConsumer consumer1 = session1.createConsumer(consumeFrom);
+ waitForBindings(1, ADDRESS, 1, 1, true);
+ javax.jms.Message message1;
+ for (int i = 0; i < numMessages; i++) {
+ message1 = consumer1.receive(5000);
+ assertNotNull(message1);
+ }
+ }
+ }
+
@Test
public void testRedistributionWhenRemoteConsumerIsAddedLbOffWithRedistribution() throws Exception {
setupCluster(MessageLoadBalancingType.OFF_WITH_REDISTRIBUTION);
@@ -1293,11 +1394,15 @@ public class MessageRedistributionTest extends ClusterTestBase {
}
protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
- setupClusterConnection("cluster0", "queues", messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
+ setupCluster("queues", messageLoadBalancingType);
+ }
+
+ protected void setupCluster(final String address, final MessageLoadBalancingType messageLoadBalancingType) throws Exception {
+ setupClusterConnection("cluster0", address, messageLoadBalancingType, 1, isNetty(), 0, 1, 2);
- setupClusterConnection("cluster1", "queues", messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
+ setupClusterConnection("cluster1", address, messageLoadBalancingType, 1, isNetty(), 1, 0, 2);
- setupClusterConnection("cluster2", "queues", messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
+ setupClusterConnection("cluster2", address, messageLoadBalancingType, 1, isNetty(), 2, 0, 1);
}
protected void setRedistributionDelay(final long delay) {