You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/18 08:51:10 UTC
[qpid-broker-j] branch 7.0.x updated: QPID-8342: [Broker-J] Virtual
host auto-creation policy should handle creation of duplicate nodes
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.0.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/7.0.x by this push:
new b886fc4 QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
b886fc4 is described below
commit b886fc4e3f2a7983eb593a88945117b807e01b6b
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 17 13:36:22 2019 +0100
QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
(cherry picked from commit 5d295c0afe382140ef40b95b4ba9b91454d805c1)
---
.../server/virtualhost/AbstractVirtualHost.java | 7 +-
.../server/queue/NodeAutoCreationPolicyTest.java | 95 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 2 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 3e43819..12f0779 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1349,11 +1349,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ catch (AbstractConfiguredObject.DuplicateNameException e)
+ {
+ return (T)e.getExisting();
+ }
catch (RuntimeException e)
{
- LOGGER.info("Unable to auto create a node named {} due to exception", name, e);
+ LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
}
-
}
}
diff --git a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
index 336ea74..e19e667 100644
--- a/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
+++ b/systests/src/test/java/org/apache/qpid/server/queue/NodeAutoCreationPolicyTest.java
@@ -21,10 +21,17 @@
package org.apache.qpid.server.queue;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.InvalidDestinationException;
@@ -53,6 +60,10 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
{
private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
+ private static final String TEST_MESSAGE = "Hello world!";
+ private static final String VALID_QUEUE_NAME = "fooQueue";
+ private static final String TYPE_QUEUE = "queue";
+ private static final String TYPE_TOPIC = "topic";
private Connection _connection;
private Session _session;
@@ -238,6 +249,85 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
assertEquals("Hello world!", ((TextMessage)received).getText());
}
+ public void testConcurrentQueueCreation() throws Exception
+ {
+ final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
+ final int numberOfActors = 3;
+ final Connection[] connections = new Connection[numberOfActors];
+ try
+ {
+ final Session[] sessions = new Session[numberOfActors];
+ for (int i = 0; i < numberOfActors; i++)
+ {
+ final Connection connection = getConnection();
+ connections[i] = connection;
+ sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
+ final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
+ try
+ {
+ Stream.of(sessions)
+ .forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
+ destination),
+ executorService)));
+ final CompletableFuture<Void> combinedFuture =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
+ combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ executorService.shutdown();
+ }
+
+ final Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(destination);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < numberOfActors; i++)
+ {
+ Message received = consumer.receive(getReceiveTimeout());
+ assertNotNull(received);
+ assertTrue(received instanceof TextMessage);
+ assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+ finally
+ {
+ for (Connection connection : connections)
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ private MessageProducer publishMessage(final Session session, final String destination)
+ {
+ try
+ {
+ final Queue queue = session.createQueue(destination);
+ final MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage(TEST_MESSAGE));
+ return producer;
+ }
+ catch (JMSException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
public void testSendingToNonMatchingQueuePattern() throws Exception
{
@@ -438,4 +528,9 @@ public class NodeAutoCreationPolicyTest extends QpidBrokerTestCase
}
return actualAlternateBindingMap;
}
+
+ private String getDestinationAddress(final String name, final String type)
+ {
+ return isBroker10() ? name : String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org