You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/19 16:47:40 UTC

qpid-broker-j git commit: QPID-8164: Make sure that only own connection consumers can consume from JMS temporary destinations

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master dbd42eaf5 -> c45aea4c4


QPID-8164: Make sure that only own connection consumers can consume from JMS temporary destinations


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c45aea4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c45aea4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c45aea4c

Branch: refs/heads/master
Commit: c45aea4c41a7c389c58ede39e8cb8913b25cfab2
Parents: dbd42ea
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Apr 18 22:14:11 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu Apr 19 17:45:49 2018 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  21 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |   4 +
 .../bindmapjms/TemporaryDestinationTest.java    | 318 ++++++++++++++++++-
 .../jms_1_1/queue/TemporaryQueueTest.java       |  35 ++
 4 files changed, 366 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index ec875e1..cd8eb83 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -28,6 +28,7 @@ import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.MessageFormat;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -64,6 +65,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.DestinationAddress;
 import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.ExclusivityPolicy;
 import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
@@ -715,7 +717,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
         if (Boolean.TRUE.equals(source.getDynamic()))
         {
-            MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties());
+            final Set<Symbol> sourceCapabilities = source.getCapabilities() == null
+                    ? Collections.emptySet()
+                    : new HashSet<>(Arrays.asList(source.getCapabilities()));
+            MessageSource tempSource = createDynamicSource(link, source.getDynamicNodeProperties(), sourceCapabilities);
             if(tempSource != null)
             {
                 source.setAddress(_primaryDomain + tempSource.getName());
@@ -789,7 +794,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         return exchangeDestination;
     }
 
-    private MessageSource createDynamicSource(final Link_1_0<?, ?> link, Map properties) throws AmqpErrorException
+    private MessageSource createDynamicSource(final Link_1_0<?, ?> link,
+                                              Map properties,
+                                              final Set<Symbol> capabilities) throws AmqpErrorException
     {
         // TODO temporary topics?
         final String queueName = "TempQueue" + UUID.randomUUID().toString();
@@ -797,6 +804,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             Map<String, Object> attributes = convertDynamicNodePropertiesToAttributes(link, properties, queueName);
 
+            if (capabilities.contains(Symbol.valueOf("temporary-queue"))
+                || capabilities.contains(Symbol.valueOf("temporary-topic")))
+            {
+                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+            }
+
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageSource>) () -> getAddressSpace().createMessageSource(MessageSource.class, attributes));
         }
@@ -829,6 +842,10 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 attributes.put(Exchange.TYPE, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
             }
+            else if (capabilitySet.contains(Symbol.valueOf("temporary-queue")))
+            {
+                attributes.put(Queue.EXCLUSIVE, ExclusivityPolicy.CONNECTION);
+            }
 
             return Subject.doAs(getSubjectWithAddedSystemRights(),
                                 (PrivilegedAction<MessageDestination>) () -> getAddressSpace().createMessageDestination(clazz, attributes));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 91ab75b..cc689b9 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -439,6 +439,10 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             {
                 targetCapabilities.add(Symbol.valueOf("temporary-topic"));
             }
+            if (desiredCapabilities.contains(Symbol.valueOf("temporary-queue")))
+            {
+                targetCapabilities.add(Symbol.valueOf("temporary-queue"));
+            }
             if (desiredCapabilities.contains(Symbol.valueOf("topic")))
             {
                 targetCapabilities.add(Symbol.valueOf("topic"));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index a3270a0..4160336 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,32 +20,41 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
 
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.Session_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 {
@@ -83,11 +92,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            Target target = new Target();
-            target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
-            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
-            target.setDynamic(true);
-            target.setCapabilities(targetCapabilities);
+            Target target = createTarget(targetCapabilities);
 
             final Interaction interaction = transport.newInteraction();
             final Attach attachResponse = interaction.negotiateProtocol().consumeResponse()
@@ -104,8 +109,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
             newTemporaryNodeAddress = ((Target) attachResponse.getTarget()).getAddress();
             assertThat(newTemporaryNodeAddress, is(notNullValue()));
 
-            assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(true));
-
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
             interaction.doCloseConnection();
@@ -113,4 +116,299 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
         assertThat(Utils.doesNodeExist(_brokerAddress, newTemporaryNodeAddress), is(false));
     }
+
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.1.5. TemporaryQueue"
+                          + "A TemporaryQueue is a unique Queue object created for the duration of a connection."
+                          + " It is a system-defined queue that can only be consumed by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+                          + " for the application by supplying a terminus capability for the particular Destination"
+                          + " type to which the client expects to attach [...]"
+                          + "TemporaryQueue Terminus capability : 'temporary-queue'")
+    public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Target target = createTarget(capabilities);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+            final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                           .open().consumeResponse(Open.class)
+                                                           .begin().consumeResponse(Begin.class)
+                                                           .attachRole(Role.SENDER)
+                                                           .attachHandle(senderHandle)
+                                                           .attachTarget(target)
+                                                           .attach().consumeResponse()
+                                                           .getLatestResponse(Attach.class);
+
+            assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+            assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            final Attach receiverAttachResponse = interaction.attachRole(Role.RECEIVER)
+                                                             .attachSource(createSource(newTemporaryNodeAddress,
+                                                                                        capabilities))
+                                                             .attachHandle(UnsignedInteger.valueOf(2))
+                                                             .attach().consumeResponse()
+                                                             .getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(((Source) receiverAttachResponse.getSource()).getAddress(),
+                       is(equalTo(newTemporaryNodeAddress)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.1.5. TemporaryQueue"
+                          + "A TemporaryQueue is a unique Queue object created for the duration of a connection."
+                          + " It is a system-defined queue that can only be consumed by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+                          + " for the application by supplying a terminus capability for the particular Destination"
+                          + " type to which the client expects to attach [...]"
+                          + "TemporaryQueue Terminus capability : 'temporary-queue'")
+    public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Target target = createTarget(capabilities);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+            final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                           .open().consumeResponse(Open.class)
+                                                           .begin().consumeResponse(Begin.class)
+                                                           .attachRole(Role.SENDER)
+                                                           .attachHandle(senderHandle)
+                                                           .attachTarget(target)
+                                                           .attach().consumeResponse()
+                                                           .getLatestResponse(Attach.class);
+
+            assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+            assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.2.7. Temporary topics"
+                          + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext,"
+                          + " Connection or TopicConnection . It is a system defined Topic whose messages may be"
+                          + " consumed only by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+                          + " for the application by supplying a terminus capability for the particular Destination"
+                          + " type to which the client expects to attach"
+                          + "TemporaryTopic Terminus capability : 'temporary-topic'")
+    public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(capabilities);
+            source.setDynamic(true);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+            final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                             .open().consumeResponse(Open.class)
+                                                             .begin().consumeResponse(Begin.class)
+                                                             .attachRole(Role.RECEIVER)
+                                                             .attachSource(source)
+                                                             .attachHandle(receiverHandle)
+                                                             .attach().consumeResponse()
+                                                             .getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            Target target = new Target();
+            target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+            target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            target.setCapabilities(capabilities);
+            target.setAddress(newTemporaryNodeAddress);
+
+            final UnsignedInteger senderHandle = UnsignedInteger.valueOf(2);
+            interaction.attachRole(Role.SENDER)
+                       .attachHandle(senderHandle)
+                       .attachTarget(target)
+                       .attach()
+                       .consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            String testData = "testData";
+            Disposition responseDisposition = interaction.transferPayloadData(testData)
+                                                         .transferHandle(senderHandle)
+                                                         .transfer()
+                                                         .consumeResponse()
+                                                         .getLatestResponse(Disposition.class);
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandle(receiverHandle)
+                       .flow()
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData)));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them."
+                          + ""
+                          + "4.2.7. Temporary topics"
+                          + "A TemporaryTopic is a unique Topic object created for the duration of a JMSContext,"
+                          + " Connection or TopicConnection . It is a system defined Topic whose messages may be"
+                          + " consumed only by the connection that created it."
+                          + ""
+                          + "AMQP JMS Mapping."
+                          + " 5.2. Destinations And Producers/Consumers"
+                          + "[...] type information SHOULD be conveyed when creating producer or consumer links"
+                          + " for the application by supplying a terminus capability for the particular Destination"
+                          + " type to which the client expects to attach"
+                          + "TemporaryTopic Terminus capability : 'temporary-topic'")
+    public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(capabilities);
+            source.setDynamic(true);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+            final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                             .open().consumeResponse(Open.class)
+                                                             .begin().consumeResponse(Begin.class)
+                                                             .attachRole(Role.RECEIVER)
+                                                             .attachSource(source)
+                                                             .attachHandle(receiverHandle)
+                                                             .attach().consumeResponse()
+                                                             .getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private void tryToConsume(final Source source) throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final Detach responseDetach = interaction.negotiateProtocol()
+                                                     .consumeResponse()
+                                                     .open()
+                                                     .consumeResponse(Open.class)
+                                                     .begin()
+                                                     .consumeResponse(Begin.class)
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSource(source)
+                                                     .attach()
+                                                     .consumeResponse(Attach.class)
+                                                     .consumeResponse(Detach.class)
+                                                     .getLatestResponse(Detach.class);
+            assertThat(responseDetach.getClosed(), is(true));
+            assertThat(responseDetach.getError(), is(Matchers.notNullValue()));
+            assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+            interaction.doCloseConnection();
+        }
+    }
+
+    private Target createTarget(final Symbol[] capabilities)
+    {
+        Target target = new Target();
+        target.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+        target.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        target.setDynamic(true);
+        target.setCapabilities(capabilities);
+        return target;
+    }
+
+    private Source createSource(final String name, final Symbol[] capabilities)
+    {
+        final Source source = new Source();
+        source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+        source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+        source.setCapabilities(capabilities);
+        source.setAddress(name);
+        return source;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c45aea4c/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
----------------------------------------------------------------------
diff --git a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
index f7b8c18..833d7d6 100644
--- a/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
+++ b/systests/qpid-systests-jms_1.1/src/test/java/org/apache/qpid/systests/jms_1_1/queue/TemporaryQueueTest.java
@@ -100,6 +100,41 @@ public class TemporaryQueueTest extends JmsTestBase
     }
 
     @Test
+    public void testConsumeFromAnotherConnectionUsingTemporaryQueueName() throws Exception
+    {
+        final Connection connection = getConnection();
+        try
+        {
+            final Connection connection2 = getConnection();
+            try
+            {
+                final Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+                final TemporaryQueue queue = session1.createTemporaryQueue();
+                assertNotNull("Temporary queue cannot be null", queue);
+
+                try
+                {
+                    session2.createConsumer(session2.createQueue(queue.getQueueName()));
+                    fail("Expected a JMSException when subscribing to a temporary queue created on a different session");
+                }
+                catch (JMSException je)
+                {
+                    //pass
+                }
+            }
+            finally
+            {
+                connection2.close();
+            }
+        }
+        finally
+        {
+            connection.close();
+        }
+    }
+
+    @Test
     public void testPublishFromAnotherConnectionAllowed() throws Exception
     {
         final Connection connection = getConnection();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org