You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/18 08:51:10 UTC

[qpid-broker-j] branch 7.0.x updated: QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git


The following commit(s) were added to refs/heads/7.0.x by this push:
     new b886fc4  QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
b886fc4 is described below

commit b886fc4e3f2a7983eb593a88945117b807e01b6b
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 17 13:36:22 2019 +0100

    QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
    
    (cherry picked from commit 5d295c0afe382140ef40b95b4ba9b91454d805c1)
---
 .../server/virtualhost/AbstractVirtualHost.java    |  7 +-
 .../server/queue/NodeAutoCreationPolicyTest.java   | 95 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 2 deletions(-)

diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 3e43819..12f0779 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1349,11 +1349,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
                         }
 
                     }
+                    catch (AbstractConfiguredObject.DuplicateNameException e)
+                    {
+                        return (T)e.getExisting();
+                    }
                     catch (RuntimeException e)
                     {
-                        LOGGER.info("Unable to auto create a node named {} due to exception", name, e);
+                        LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
                     }
-
                 }
             }
 
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
index 336ea74..e19e667 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
@@ -21,10 +21,17 @@
 package org.apache.qpid.server.queue;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import javax.jms.Connection;
 import javax.jms.InvalidDestinationException;
@@ -53,6 +60,10 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
 {
     private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
     private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
+    private static final String TEST_MESSAGE = "Hello world!";
+    private static final String VALID_QUEUE_NAME = "fooQueue";
+    private static final String TYPE_QUEUE = "queue";
+    private static final String TYPE_TOPIC = "topic";
 
     private Connection _connection;
     private Session _session;
@@ -238,6 +249,85 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
         assertEquals("Hello world!", ((TextMessage)received).getText());
     }
 
+    public void testConcurrentQueueCreation() throws Exception
+    {
+        final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
+        final int numberOfActors = 3;
+        final Connection[] connections = new Connection[numberOfActors];
+        try
+        {
+            final Session[] sessions = new Session[numberOfActors];
+            for (int i = 0; i < numberOfActors; i++)
+            {
+                final Connection connection = getConnection();
+                connections[i] = connection;
+                sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            }
+
+            final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
+            final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
+            try
+            {
+                Stream.of(sessions)
+                      .forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
+                                                                                                         destination),
+                                                                                    executorService)));
+                final CompletableFuture<Void> combinedFuture =
+                        CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
+                combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+            }
+            finally
+            {
+                executorService.shutdown();
+            }
+
+            final Connection connection = getConnection();
+            try
+            {
+                connection.start();
+                final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final Queue queue = session.createQueue(destination);
+                final MessageConsumer consumer = session.createConsumer(queue);
+
+                for (int i = 0; i < numberOfActors; i++)
+                {
+                    Message received = consumer.receive(getReceiveTimeout());
+                    assertNotNull(received);
+                    assertTrue(received instanceof TextMessage);
+                    assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
+                }
+            }
+            finally
+            {
+                connection.close();
+            }
+        }
+        finally
+        {
+            for (Connection connection : connections)
+            {
+                if (connection != null)
+                {
+                    connection.close();
+                }
+            }
+        }
+    }
+
+    private MessageProducer publishMessage(final Session session, final String destination)
+    {
+        try
+        {
+            final Queue queue = session.createQueue(destination);
+            final MessageProducer producer = session.createProducer(queue);
+            producer.send(session.createTextMessage(TEST_MESSAGE));
+            return producer;
+        }
+        catch (JMSException e)
+        {
+            throw new IllegalStateException(e);
+        }
+    }
 
     public void testSendingToNonMatchingQueuePattern() throws Exception
     {
@@ -438,4 +528,9 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
         }
         return actualAlternateBindingMap;
     }
+
+    private String getDestinationAddress(final String name, final String type)
+    {
+        return isBroker10() ? name : String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org