You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/03/18 15:52:06 UTC

[activemq-artemis] branch master updated: ARTEMIS-2665 AMQP use createSharedQueue like Core

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new f1fdfc6  ARTEMIS-2665 AMQP use createSharedQueue like Core
     new 33e841c  This closes #3031
f1fdfc6 is described below

commit f1fdfc6857a7d5098a8b17428a184696189ebb07
Author: Michael Pearce <mi...@me.com>
AuthorDate: Wed Mar 18 13:53:38 2020 +0000

    ARTEMIS-2665 AMQP use createSharedQueue like Core
---
 .../protocol/amqp/broker/AMQPSessionCallback.java      |  4 ++--
 .../tests/integration/amqp/JMSSharedConsumerTest.java  | 18 +++++++++++++++---
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index a5d7930..b390aaf 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -264,7 +264,7 @@ public class AMQPSessionCallback implements SessionCallback {
                                         SimpleString queueName,
                                         SimpleString filter) throws Exception {
       try {
-         serverSession.createQueue(address, queueName, routingType, filter, false, true, -1, false, false);
+         serverSession.createSharedQueue(address, queueName, routingType, filter, true, -1, false, false, false);
       } catch (ActiveMQSecurityException se) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
       }
@@ -275,7 +275,7 @@ public class AMQPSessionCallback implements SessionCallback {
                                          SimpleString queueName,
                                          SimpleString filter) throws Exception {
       try {
-         serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true);
+         serverSession.createSharedQueue(address, queueName, routingType, filter, false, -1, false, false, false);
       } catch (ActiveMQSecurityException se) {
          throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage());
       } catch (ActiveMQQueueExistsException e) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
index 4113e4e..537d394 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
 
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -29,6 +28,8 @@ import javax.jms.Topic;
 import java.util.Arrays;
 import java.util.Collection;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -58,7 +59,11 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
       return "AMQP,OPENWIRE,CORE";
    }
 
-   private void testSharedConsumer(Connection connection1, Connection connection2) throws JMSException {
+   private void testSharedConsumer(Connection connection1, Connection connection2) throws Exception {
+      testSharedConsumer(connection1, connection2, false);
+   }
+
+   private void testSharedConsumer(Connection connection1, Connection connection2, boolean amqpQueueName) throws Exception {
       try {
          Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
          Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -89,6 +94,13 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
          }
          assertNotNull("Should have received a message by now.", received);
          assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
+
+         String consumerQueueName = "nonDurable.SharedConsumer";
+         if (amqpQueueName) {
+            consumerQueueName = "SharedConsumer:shared-volatile:global";
+         }
+         QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(consumerQueueName));
+         assertTrue(queueBinding.getQueue().isTemporary());
       } finally {
          connection1.close();
          connection2.close();
@@ -100,7 +112,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
       Connection connection = createConnection(); //AMQP
       Connection connection2 = createConnection(); //AMQP
 
-      testSharedConsumer(connection, connection2);
+      testSharedConsumer(connection, connection2, !amqpUseCoreSubscriptionNaming);
    }
 
    @Test(timeout = 30000)