You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/07/17 13:12:41 UTC
[qpid-broker-j] branch 7.1.x updated: QPID-8342: [Broker-J] Virtual
host auto-creation policy should handle creation of duplicate nodes
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch 7.1.x
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
The following commit(s) were added to refs/heads/7.1.x by this push:
new 4f3f273 QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
4f3f273 is described below
commit 4f3f27312d08a69df4f6d87ac048439ebe516256
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Wed Jul 17 13:36:22 2019 +0100
QPID-8342: [Broker-J] Virtual host auto-creation policy should handle creation of duplicate nodes
(cherry picked from commit 5d295c0afe382140ef40b95b4ba9b91454d805c1)
---
.../server/virtualhost/AbstractVirtualHost.java | 7 +-
.../autocreation/NodeAutoCreationPolicyTest.java | 129 ++++++++++++++++++---
2 files changed, 116 insertions(+), 20 deletions(-)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index 7642ad0..856a627 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -1350,11 +1350,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
}
+ catch (AbstractConfiguredObject.DuplicateNameException e)
+ {
+ return (T)e.getExisting();
+ }
catch (RuntimeException e)
{
- LOGGER.info("Unable to auto create a node named {} due to exception", name, e);
+ LOGGER.info("Unable to auto create a node named '{}' due to exception", name, e);
}
-
}
}
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
index 6e36749..c192895 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/extensions/autocreation/NodeAutoCreationPolicyTest.java
@@ -32,10 +32,17 @@ import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.JMSException;
@@ -63,6 +70,10 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
private static final String DEAD_LETTER_QUEUE_SUFFIX = "_DLQ";
private static final String DEAD_LETTER_EXCHANGE_SUFFIX = "_DLE";
private static final String AUTO_CREATION_POLICIES = createAutoCreationPolicies();
+ private static final String TEST_MESSAGE = "Hello world!";
+ private static final String VALID_QUEUE_NAME = "fooQueue";
+ private static final String TYPE_QUEUE = "queue";
+ private static final String TYPE_TOPIC = "topic";
private static String createAutoCreationPolicies()
@@ -227,17 +238,15 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0
- ? "fooQueue"
- : "ADDR: fooQueue ; { assert: never, node: { type: queue } }");
+ final Queue queue = session.createQueue(getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE));
final MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("Hello world!"));
+ producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
- assertEquals("Hello world!", ((TextMessage) received).getText());
+ assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
}
finally
{
@@ -246,6 +255,89 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
}
@Test
+ public void testConcurrentQueueCreation() throws Exception
+ {
+ updateAutoCreationPolicies();
+
+ final String destination = getDestinationAddress(VALID_QUEUE_NAME, TYPE_QUEUE);
+ final int numberOfActors = 3;
+ final Connection[] connections = new Connection[numberOfActors];
+ try
+ {
+ final Session[] sessions = new Session[numberOfActors];
+ for (int i = 0; i < numberOfActors; i++)
+ {
+ final Connection connection = getConnection();
+ connections[i] = connection;
+ sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ final List<CompletableFuture<MessageProducer>> futures = new ArrayList<>(numberOfActors);
+ final ExecutorService executorService = Executors.newFixedThreadPool(numberOfActors);
+ try
+ {
+ Stream.of(sessions)
+ .forEach(session -> futures.add(CompletableFuture.supplyAsync(() -> publishMessage(session,
+ destination),
+ executorService)));
+ final CompletableFuture<Void> combinedFuture =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[numberOfActors]));
+ combinedFuture.get(getReceiveTimeout(), TimeUnit.MILLISECONDS);
+ }
+ finally
+ {
+ executorService.shutdown();
+ }
+
+ final Connection connection = getConnection();
+ try
+ {
+ connection.start();
+ final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Queue queue = session.createQueue(destination);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ for (int i = 0; i < numberOfActors; i++)
+ {
+ Message received = consumer.receive(getReceiveTimeout());
+ assertNotNull(received);
+ assertTrue(received instanceof TextMessage);
+ assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+ finally
+ {
+ for (Connection connection : connections)
+ {
+ if (connection != null)
+ {
+ connection.close();
+ }
+ }
+ }
+ }
+
+ private MessageProducer publishMessage(final Session session, final String destination)
+ {
+ try
+ {
+ final Queue queue = session.createQueue(destination);
+ final MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage(TEST_MESSAGE));
+ return producer;
+ }
+ catch (JMSException e)
+ {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Test
public void testSendingToNonMatchingQueuePattern() throws Exception
{
updateAutoCreationPolicies();
@@ -254,9 +346,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Queue queue = session.createQueue(getProtocol() == Protocol.AMQP_1_0
- ? "foQueue"
- : "ADDR: foQueue ; { assert: never, node: { type: queue } }");
+ final Queue queue = session.createQueue(getDestinationAddress("foQueue", TYPE_QUEUE));
try
{
session.createProducer(queue);
@@ -283,11 +373,9 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0
- ? "barExchange/foo"
- : "ADDR: barExchange/foo ; { assert: never, node: { type: topic } }");
+ final Topic topic = session.createTopic(getDestinationAddress("barExchange/foo", TYPE_TOPIC));
final MessageProducer producer = session.createProducer(topic);
- producer.send(session.createTextMessage("Hello world!"));
+ producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(topic);
Message received = consumer.receive(getReceiveTimeout() / 4);
@@ -316,9 +404,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- final Topic topic = session.createTopic(getProtocol() == Protocol.AMQP_1_0
- ? "baa"
- : "ADDR: baa ; { assert: never, node: { type: topic } }");
+ final Topic topic = session.createTopic(getDestinationAddress("baa", TYPE_TOPIC));
try
{
session.createProducer(topic);
@@ -350,13 +436,13 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue("BURL:direct:///fooQ/fooQ");
final MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("Hello world!"));
+ producer.send(session.createTextMessage(TEST_MESSAGE));
final MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(getReceiveTimeout());
assertNotNull(received);
assertTrue(received instanceof TextMessage);
- assertEquals("Hello world!", ((TextMessage) received).getText());
+ assertEquals(TEST_MESSAGE, ((TextMessage) received).getText());
}
finally
{
@@ -380,7 +466,7 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
try
{
final MessageProducer producer = session.createProducer(queue);
- producer.send(session.createTextMessage("Hello world!"));
+ producer.send(session.createTextMessage(TEST_MESSAGE));
fail("Sending a message should fail");
}
@@ -467,4 +553,11 @@ public class NodeAutoCreationPolicyTest extends JmsTestBase
}
return actualAlternateBindingMap;
}
+
+ private String getDestinationAddress(final String name, final String type)
+ {
+ return getProtocol() == Protocol.AMQP_1_0
+ ? name
+ : String.format("ADDR: %s; { assert: never, node: { type: %s } }", name, type);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org