You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/17 13:12:41 UTC

[qpid-broker-j] branch 7.1.x updated: QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/7.1.x by this push:
     new 4f3f273  QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
4f3f273 is described below

commit 4f3f27312d08a69df4f6d87ac048439ebe516256
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 17 13:36:22 2019 +0100

    QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
    
    (cherry picked from commit 5d295c0afe382140ef40b95b4ba9b91454d805c1)
---
 .../server/virtualhost/AbstractVirtualHost.java    |   7 +-
 .../autocreation/NodeAutoCreationPolicyTest.java   | 129 ++++++++++++++++++---
 2 files changed, 116 insertions(+), 20 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7642ad0..856a627 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1350,11 +1350,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                         }
 
                     }
+                    catch (AbstractConfiguredObject.DuplicateNameException e)
+                    {
+                        return (T)e.getExisting();
+                    }
                     catch (RuntimeException e)
                     {
-                        LOGGER.info("Unable to auto create a node named {} due to exception", name, e);
+                        LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
                     }
-
                 }
             }
 
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
index 6e36749..c192895 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
@@ -32,10 +32,17 @@ import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -63,6 +70,10 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
     private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
     private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
     private static final String AUTO_CREATION_POLICIES = createAutoCreationPolicies();
+    private static final String TEST_MESSAGE = "Hello world!";
+    private static final String VALID_QUEUE_NAME = "fooQueue";
+    private static final String TYPE_QUEUE = "queue";
+    private static final String TYPE_TOPIC = "topic";
 
 
     private static String createAutoCreationPolicies()
@@ -227,17 +238,15 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
         {
             connection.start();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0
-                                                             ? "fooQueue"
-                                                             : "ADDR: fooQueue ; { assert: never, node: { type: queue } }");
+            final Queue queue = session.createQueue(getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE));
             final MessageProducer producer = session.createProducer(queue);
-            producer.send(session.createTextMessage("Hello world!"));
+            producer.send(session.createTextMessage(TEST_MESSAGE));
 
             final MessageConsumer consumer = session.createConsumer(queue);
             Message received = consumer.receive(getReceiveTimeout());
             assertNotNull(received);
             assertTrue(received instanceof TextMessage);
-            assertEquals("Hello world!", ((TextMessage) received).getText());
+            assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
         }
         finally
         {
@@ -246,6 +255,89 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
     }
 
     @Test
+    public void testConcurrentQueueCreation() throws Exception
+    {
+        updateAutoCreationPolicies();
+
+        final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
+        final int numberOfActors = 3;
+        final Connection[] connections = new Connection[numberOfActors];
+        try
+        {
+            final Session[] sessions = new Session[numberOfActors];
+            for (int i = 0; i < numberOfActors; i++)
+            {
+                final Connection connection = getConnection();
+                connections[i] = connection;
+                sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+
+            final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
+            final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
+            try
+            {
+                Stream.of(sessions)
+                      .forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
+                                                                                                         destination),
+                                                                                    executorService)));
+                final CompletableFuture<Void> combinedFuture =
+                        CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
+                combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+            }
+            finally
+            {
+                executorService.shutdown();
+            }
+
+            final Connection connection = getConnection();
+            try
+            {
+                connection.start();
+                final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final Queue queue = session.createQueue(destination);
+                final MessageConsumer consumer = session.createConsumer(queue);
+
+                for (int i = 0; i < numberOfActors; i++)
+                {
+                    Message received = consumer.receive(getReceiveTimeout());
+                    assertNotNull(received);
+                    assertTrue(received instanceof TextMessage);
+                    assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
+                }
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        finally
+        {
+            for (Connection connection : connections)
+            {
+                if (connection != null)
+                {
+                    connection.close();
+                }
+            }
+        }
+    }
+
+    private MessageProducer publishMessage(final Session session, final String destination)
+    {
+        try
+        {
+            final Queue queue = session.createQueue(destination);
+            final MessageProducer producer = session.createProducer(queue);
+            producer.send(session.createTextMessage(TEST_MESSAGE));
+            return producer;
+        }
+        catch (JMSException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    @Test
     public void testSendingToNonMatchingQueuePattern() throws Exception
     {
         updateAutoCreationPolicies();
@@ -254,9 +346,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
         try
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0
-                                                            ? "foQueue"
-                                                            : "ADDR: foQueue ; { assert: never, node: { type: queue } }");
+            final Queue queue = session.createQueue(getDestinationAddress("foQueue", TYPE_QUEUE));
             try
             {
                 session.createProducer(queue);
@@ -283,11 +373,9 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
         {
             connection.start();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0
-                                                            ? "barExchange/foo"
-                                                            : "ADDR: barExchange/foo ; { assert: never, node: { type: topic } }");
+            final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
             final MessageProducer producer = session.createProducer(topic);
-            producer.send(session.createTextMessage("Hello world!"));
+            producer.send(session.createTextMessage(TEST_MESSAGE));
 
             final MessageConsumer consumer = session.createConsumer(topic);
             Message received = consumer.receive(getReceiveTimeout() / 4);
@@ -316,9 +404,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
         try
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0
-                                                            ? "baa"
-                                                            : "ADDR: baa ; { assert: never, node: { type: topic } }");
+            final Topic topic = session.createTopic(getDestinationAddress("baa", TYPE_TOPIC));
             try
             {
                 session.createProducer(topic);
@@ -350,13 +436,13 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             final Queue queue = session.createQueue("BURL:direct:///fooQ/fooQ");
             final MessageProducer producer = session.createProducer(queue);
-            producer.send(session.createTextMessage("Hello world!"));
+            producer.send(session.createTextMessage(TEST_MESSAGE));
 
             final MessageConsumer consumer = session.createConsumer(queue);
             Message received = consumer.receive(getReceiveTimeout());
             assertNotNull(received);
             assertTrue(received instanceof TextMessage);
-            assertEquals("Hello world!", ((TextMessage) received).getText());
+            assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
         }
         finally
         {
@@ -380,7 +466,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
             try
             {
                 final MessageProducer producer = session.createProducer(queue);
-                producer.send(session.createTextMessage("Hello world!"));
+                producer.send(session.createTextMessage(TEST_MESSAGE));
 
                 fail("Sending a message should fail");
             }
@@ -467,4 +553,11 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
         }
         return actualAlternateBindingMap;
     }
+
+    private String getDestinationAddress(final String name, final String type)
+    {
+        return getProtocol() == Protocol.AMQP_1_0
+                ? name
+                : String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org