You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/10/29 15:17:54 UTC

[activemq-artemis] 03/04: ARTEMIS-2966 Anycast queues with distinct names would cause issues on sending messages

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit d51c89471eb07ea21feffb6afb5edc58abbcfc6b
Author: Clebert Suconic <cl...@apache.org>
AuthorDate: Wed Oct 28 12:05:32 2020 -0400

    ARTEMIS-2966 Anycast queues with distinct names would cause issues on sending messages
---
 .../protocol/amqp/broker/AMQPSessionCallback.java  |  7 +-
 .../amqp/AmqpAnyCastDistinctQueueTest.java         | 75 ++++++++++++++++++++++
 .../amqp/AmqpFullyQualifiedNameTest.java           |  8 ++-
 .../amqp/connect/QpidDispatchPeerTest.java         | 30 ++++++---
 4 files changed, 108 insertions(+), 12 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index cc5616a..9d165d7 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
 import org.apache.activemq.artemis.core.paging.PagingStore;
 import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.core.security.SecurityAuth;
 import org.apache.activemq.artemis.core.server.AddressQueryResult;
@@ -346,7 +347,11 @@ public class AMQPSessionCallback implements SessionCallback {
          }
       } else if (routingType == RoutingType.ANYCAST) {
          if (manager.getServer().locateQueue(unPrefixedAddress) == null) {
-            if (addressSettings.isAutoCreateQueues()) {
+            Bindings bindings = manager.getServer().getPostOffice().lookupBindingsForAddress(address);
+            if (bindings != null) {
+               // this means the address has another queue with a different name, which is fine, we just ignore it on this case
+               result = true;
+            } else if (addressSettings.isAutoCreateQueues()) {
                try {
                   serverSession.createQueue(new QueueConfiguration(address).setRoutingType(routingType).setAutoCreated(true));
                } catch (ActiveMQQueueExistsException e) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java
new file mode 100644
index 0000000..dd08bd1
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpAnyCastDistinctQueueTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.amqp;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.artemis.api.core.QueueConfiguration;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Make sure auto create will not create a queue when the queue and its address have a distinct name in anycast.
+ */
+public class AmqpAnyCastDistinctQueueTest extends AmqpClientTestSupport {
+
+   @Override
+   protected void addAdditionalAcceptors(ActiveMQServer server) throws Exception {
+   }
+
+   @Test(timeout = 60000)
+   public void testDistinctQueueAddressAnyCast() throws Exception {
+      String ADDRESS_NAME = "DISTINCT_ADDRESS_testDistinctAddressAnyCast";
+      String QUEUE_NAME = "DISTINCT_QUEUE_testDistinctQUEUE_AnyCast";
+      server.addAddressInfo(new AddressInfo(ADDRESS_NAME).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
+      server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(ADDRESS_NAME).setDurable(true).setRoutingType(RoutingType.ANYCAST));
+
+      final int NUMBER_OF_MESSAGES = 100;
+
+      ConnectionFactory jmsCF = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
+      try (Connection connection = jmsCF.createConnection()) {
+         Session session = connection.createSession();
+         Queue queueSending = session.createQueue(ADDRESS_NAME);
+         MessageProducer producer = session.createProducer(queueSending);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+         }
+
+         Queue queueReceiving = session.createQueue(QUEUE_NAME);
+         connection.start();
+         MessageConsumer consumer = session.createConsumer(queueReceiving);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            TextMessage message = (TextMessage)consumer.receive(5000);
+            Assert.assertNotNull(message);
+            Assert.assertEquals("hello " + i, message.getText());
+         }
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+
+   }
+}
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
index ba1a3aa..3dea975 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpFullyQualifiedNameTest.java
@@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.CompositeAddress;
@@ -278,7 +279,12 @@ public class AmqpFullyQualifiedNameTest extends JMSClientTestSupport {
 
    @Test
    public void testQueue() throws Exception {
-      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true));
+      server.getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false));
+
+      server.addAddressInfo(new AddressInfo(anycastAddress).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false));
+      server.createQueue(new QueueConfiguration(anycastQ1).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+      server.createQueue(new QueueConfiguration(anycastQ2).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
+      server.createQueue(new QueueConfiguration(anycastQ3).setAddress(anycastAddress).setRoutingType(RoutingType.ANYCAST).setDurable(true));
 
       Connection connection = createConnection();
       try {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
index 100e1c9..507b3d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java
@@ -82,16 +82,21 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60_000)
+   public void testWithMatchingDifferentNamesOnQueue() throws Exception {
+      internalMultipleQueues(true, true);
+   }
+
+   @Test(timeout = 60_000)
    public void testWithMatching() throws Exception {
-      internalMultipleQueues(true);
+      internalMultipleQueues(true, false);
    }
 
    @Test(timeout = 60_000)
    public void testwithQueueName() throws Exception {
-      internalMultipleQueues(false);
+      internalMultipleQueues(false, true);
    }
 
-   private void internalMultipleQueues(boolean useMatching) throws Exception {
+   private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception {
       final int numberOfMessages = 100;
       final int numberOfQueues = 10;
       AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24621").setRetryInterval(10).setReconnectAttempts(-1);
@@ -99,14 +104,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER));
       } else {
          for (int i = 0; i < numberOfQueues; i++) {
-            amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName("queue.test" + i).setType(AMQPBrokerConnectionAddressType.PEER));
+            amqpConnection.addElement(new AMQPBrokerConnectionElement().setQueueName(createQueueName(i, distinctNaming)).setType(AMQPBrokerConnectionAddressType.PEER));
          }
       }
       server.getConfiguration().addAMQPConnection(amqpConnection);
       server.start();
       for (int i = 0; i < numberOfQueues; i++) {
          server.addAddressInfo(new AddressInfo("queue.test" + i).addRoutingType(RoutingType.ANYCAST).setAutoCreated(false).setTemporary(false));
-         server.createQueue(new QueueConfiguration("queue.test" + i).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST));
+         server.createQueue(new QueueConfiguration(createQueueName(i, distinctNaming)).setAddress("queue.test" + i).setRoutingType(RoutingType.ANYCAST));
       }
 
       for (int dest = 0; dest < numberOfQueues; dest++) {
@@ -120,7 +125,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          MessageProducer producer = session.createProducer(queue);
          producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
-         org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest);
+         org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming));
 
          for (int i = 0; i < numberOfMessages; i++) {
             producer.send(session.createTextMessage("hello " + i));
@@ -130,8 +135,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
          connection.close();
       }
 
-      System.out.println("*******************************************************************************************************************************");
-      System.out.println("Creating consumer");
 
       for (int dest = 0; dest < numberOfQueues; dest++) {
          ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622");
@@ -151,7 +154,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
                   System.out.println("*******************************************************************************************************************************");
                }
                Assert.assertNotNull(received);
-               System.out.println("message " + received.getText());
                Assert.assertEquals("hello " + i, received.getText());
             }
             Assert.assertNull(consumer.receiveNoWait());
@@ -162,12 +164,20 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport {
 
             }
          }
-         org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue("queue.test" + dest);
+         org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming));
          Wait.assertEquals(0, testQueueOnServer::getMessageCount);
       }
 
    }
 
+   private String createQueueName(int i, boolean useDistinctName) {
+      if (useDistinctName) {
+         return "distinct.test" + i;
+      } else {
+         return "queue.test" + i;
+      }
+   }
+
    private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer,
                                                 Connection connection) throws InterruptedException {
       for (int i = 0; i < 100; i++) {