You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/19 16:47:40 UTC
qpid-broker-j git commit: QPID-8164: Make sure that only own
connection consumers can consume from JMS temporary destinations
Repository: qpid-broker-j
Updated Branches:
refs/heads/master dbd42eaf5 -> c45aea4c4
QPID-8164: Make sure that only own connection consumers can consume from JMS temporary destinations
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c45aea4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c45aea4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c45aea4c
Branch: refs/heads/master
Commit: c45aea4c41a7c389c58ede39e8cb8913b25cfab2
Parents: dbd42ea
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Apr 18 22:14:11 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Apr 19 17:45:49 2018 +0100
----------------------------------------------------------------------
.../qpid/server/protocol/v1_0/Session_1_0.java | 21 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 4 +
.../bindmapjms/TemporaryDestinationTest.java | 318 ++++++++++++++++++-
.../jms_1_1/queue/TemporaryQueueTest.java | 35 ++
4 files changed, 366 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ec875e1..cd8eb83 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,6 +28,7 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.MessageFormat;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -64,6 +65,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.DestinationAddress;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
@@ -715,7 +717,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (Boolean.TRUE.equals(source.getDynamic()))
{
- MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties());
+ final Set<Symbol> sourceCapabilities = source.getCapabilities() == null
+ ? Collections.emptySet()
+ : new HashSet<>(Arrays.asList(source.getCapabilities()));
+ MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities);
if(tempSource != null)
{
source.setAddress(_primaryDomain + tempSource.getName());
@@ -789,7 +794,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
return exchangeDestination;
}
- private MessageSource createDynamicSource(final Link_1_0<?, ?> link, Map properties) throws AmqpErrorException
+ private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
+ Map properties,
+ final Set<Symbol> capabilities) throws AmqpErrorException
{
// TODO temporary topics?
final String queueName = "TempQueue" + UUID.randomUUID().toString();
@@ -797,6 +804,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName);
+ if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+ || capabilities.contains(Symbol.valueOf("temporary-topic")))
+ {
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+ }
+
return Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
}
@@ -829,6 +842,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
}
+ else if (capabilitySet.contains(Symbol.valueOf("temporary-queue")))
+ {
+ attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+ }
return Subject.doAs(getSubjectWithAddedSystemRights(),
(PrivilegedAction<MessageDestination>) () -> getAddressSpace().createMessageDestination(clazz, attributes));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 91ab75b..cc689b9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -439,6 +439,10 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
targetCapabilities.add(Symbol.valueOf("temporary-topic"));
}
+ if (desiredCapabilities.contains(Symbol.valueOf("temporary-queue")))
+ {
+ targetCapabilities.add(Symbol.valueOf("temporary-queue"));
+ }
if (desiredCapabilities.contains(Symbol.valueOf("topic")))
{
targetCapabilities.add(Symbol.valueOf("topic"));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index a3270a0..4160336 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,32 +20,41 @@
package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import java.net.InetSocketAddress;
import java.util.Collections;
+import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
{
@@ -83,11 +92,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- Target target = new Target();
- target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
- target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
- target.setDynamic(true);
- target.setCapabilities(targetCapabilities);
+ Target target = createTarget(targetCapabilities);
final Interaction interaction = transport.newInteraction();
final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
@@ -104,8 +109,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress();
assertThat(newTemporaryNodeAddress, is(notNullValue()));
- assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true));
-
interaction.consumeResponse().getLatestResponse(Flow.class);
interaction.doCloseConnection();
@@ -113,4 +116,299 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
}
+
+
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them."
+ + ""
+ + "4.1.5. TemporaryQueue"
+ + "A TemporaryQueue is a unique Queue object created for the duration of a connection."
+ + " It is a system-defined queue that can only be consumed by the connection that created it."
+ + ""
+ + "AMQP JMS Mapping."
+ + " 5.2. Destinations And Producers/Consumers"
+ + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+ + " for the application by supplying a terminus capability for the particular Destination"
+ + " type to which the client expects to attach [...]"
+ + "TemporaryQueue Terminus capability : 'temporary-queue'")
+ public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Target target = createTarget(capabilities);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+ final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachHandle(senderHandle)
+ .attachTarget(target)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+ assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ interaction.consumeResponse().getLatestResponse(Flow.class);
+
+ final Attach receiverAttachResponse = interaction.attachRole(Role.RECEIVER)
+ .attachSource(createSource(newTemporaryNodeAddress,
+ capabilities))
+ .attachHandle(UnsignedInteger.valueOf(2))
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+ assertThat(((Source) receiverAttachResponse.getSource()).getAddress(),
+ is(equalTo(newTemporaryNodeAddress)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them."
+ + ""
+ + "4.1.5. TemporaryQueue"
+ + "A TemporaryQueue is a unique Queue object created for the duration of a connection."
+ + " It is a system-defined queue that can only be consumed by the connection that created it."
+ + ""
+ + "AMQP JMS Mapping."
+ + " 5.2. Destinations And Producers/Consumers"
+ + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+ + " for the application by supplying a terminus capability for the particular Destination"
+ + " type to which the client expects to attach [...]"
+ + "TemporaryQueue Terminus capability : 'temporary-queue'")
+ public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Target target = createTarget(capabilities);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+ final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachHandle(senderHandle)
+ .attachTarget(target)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+ assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ interaction.consumeResponse().getLatestResponse(Flow.class);
+
+ tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them."
+ + ""
+ + "4.2.7. Temporary topics"
+ + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext,"
+ + " Connection or TopicConnection . It is a system defined Topic whose messages may be"
+ + " consumed only by the connection that created it."
+ + ""
+ + "AMQP JMS Mapping."
+ + " 5.2. Destinations And Producers/Consumers"
+ + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+ + " for the application by supplying a terminus capability for the particular Destination"
+ + " type to which the client expects to attach"
+ + "TemporaryTopic Terminus capability : 'temporary-topic'")
+ public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(capabilities);
+ source.setDynamic(true);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+ final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attachHandle(receiverHandle)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ Target target = new Target();
+ target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ target.setCapabilities(capabilities);
+ target.setAddress(newTemporaryNodeAddress);
+
+ final UnsignedInteger senderHandle = UnsignedInteger.valueOf(2);
+ interaction.attachRole(Role.SENDER)
+ .attachHandle(senderHandle)
+ .attachTarget(target)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ String testData = "testData";
+ Disposition responseDisposition = interaction.transferPayloadData(testData)
+ .transferHandle(senderHandle)
+ .transfer()
+ .consumeResponse()
+ .getLatestResponse(Disposition.class);
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandle(receiverHandle)
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData)));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them."
+ + ""
+ + "4.2.7. Temporary topics"
+ + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext,"
+ + " Connection or TopicConnection . It is a system defined Topic whose messages may be"
+ + " consumed only by the connection that created it."
+ + ""
+ + "AMQP JMS Mapping."
+ + " 5.2. Destinations And Producers/Consumers"
+ + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+ + " for the application by supplying a terminus capability for the particular Destination"
+ + " type to which the client expects to attach"
+ + "TemporaryTopic Terminus capability : 'temporary-topic'")
+ public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(capabilities);
+ source.setDynamic(true);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+ final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attachHandle(receiverHandle)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ private void tryToConsume(final Source source) throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final Detach responseDetach = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Detach.class)
+ .getLatestResponse(Detach.class);
+ assertThat(responseDetach.getClosed(), is(true));
+ assertThat(responseDetach.getError(), is(Matchers.notNullValue()));
+ assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+ interaction.doCloseConnection();
+ }
+ }
+
+ private Target createTarget(final Symbol[] capabilities)
+ {
+ Target target = new Target();
+ target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ target.setDynamic(true);
+ target.setCapabilities(capabilities);
+ return target;
+ }
+
+ private Source createSource(final String name, final Symbol[] capabilities)
+ {
+ final Source source = new Source();
+ source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(capabilities);
+ source.setAddress(name);
+ return source;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
index f7b8c18..833d7d6 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
@@ -100,6 +100,41 @@ public class TemporaryQueueTest extends JmsTestBase
}
@Test
+ public void testConsumeFromAnotherConnectionUsingTemporaryQueueName() throws Exception
+ {
+ final Connection connection = getConnection();
+ try
+ {
+ final Connection connection2 = getConnection();
+ try
+ {
+ final Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ final TemporaryQueue queue = session1.createTemporaryQueue();
+ assertNotNull("Temporary queue cannot be null", queue);
+
+ try
+ {
+ session2.createConsumer(session2.createQueue(queue.getQueueName()));
+ fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
+ }
+ catch (JMSException je)
+ {
+ //pass
+ }
+ }
+ finally
+ {
+ connection2.close();
+ }
+ }
+ finally
+ {
+ connection.close();
+ }
+ }
+
+ @Test
public void testPublishFromAnotherConnectionAllowed() throws Exception
{
final Connection connection = getConnection();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org