You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/23 12:22:21 UTC

[2/2] qpid-broker-j git commit: QPID-8164: Add tests ensuring that sending links to temporary queues/topics from other connections are allowed.

QPID-8164: Add tests ensuring that sending links to temporary queues/topics from other connections are allowed.

(cherry picked from commit 4dcbb741b4e89f109777777154b4f74632f76683)


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bc13e07a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bc13e07a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bc13e07a

Branch: refs/heads/7.0.x
Commit: bc13e07a4fe398749298f42726c9d03b5f38a77f
Parents: 73d4350
Author: Keith Wall <kw...@apache.org>
Authored: Mon Apr 23 09:23:38 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Apr 23 11:54:51 2018 +0100

----------------------------------------------------------------------
 .../bindmapjms/TemporaryDestinationTest.java    | 160 ++++++++++++++-----
 1 file changed, 119 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bc13e07a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 4160336..1dd63ea 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,7 +20,6 @@
 
 package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
 
-import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -36,7 +35,6 @@ import org.junit.Test;
 import org.apache.qpid.server.protocol.v1_0.Session_1_0;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -45,7 +43,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -58,6 +55,8 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
 public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 {
+    private static final Symbol TEMPORARY_QUEUE = Symbol.valueOf("temporary-queue");
+    private static final Symbol TEMPORARY_TOPIC = Symbol.valueOf("temporary-topic");
     private InetSocketAddress _brokerAddress;
 
     @Before
@@ -73,7 +72,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.")
     public void deleteOnCloseWithConnectionCloseForQueue() throws Exception
     {
-        deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-queue")});
+        deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_QUEUE});
     }
 
     @Test
@@ -83,12 +82,12 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.")
     public void deleteOnCloseWithConnectionCloseForTopic() throws Exception
     {
-        deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-topic")});
+        deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_TOPIC});
     }
 
     private void deleteOnCloseWithConnectionClose(final Symbol[] targetCapabilities) throws Exception
     {
-        String newTemporaryNodeAddress = null;
+        String newTemporaryNodeAddress;
 
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
@@ -136,9 +135,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + " for the application by supplying a terminus capability for the particular Destination"
                           + " type to which the client expects to attach [...]"
                           + "TemporaryQueue Terminus capability : 'temporary-queue'")
-    public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception
+    public void createTemporaryQueueReceivingLink() throws Exception
     {
-        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             Target target = createTarget(capabilities);
@@ -196,9 +195,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + " for the application by supplying a terminus capability for the particular Destination"
                           + " type to which the client expects to attach [...]"
                           + "TemporaryQueue Terminus capability : 'temporary-queue'")
-    public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception
+    public void createTemporaryQueueReceivingLinkFromOtherConnectionDisallowed() throws Exception
     {
-        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             Target target = createTarget(capabilities);
@@ -222,7 +221,46 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
 
             interaction.consumeResponse().getLatestResponse(Flow.class);
 
-            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+            assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED);
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them.")
+    public void createTemporaryQueueSendingLinkFromOtherConnectionAllowed() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Target target = createTarget(capabilities);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+            final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                           .open().consumeResponse(Open.class)
+                                                           .begin().consumeResponse(Begin.class)
+                                                           .attachRole(Role.SENDER)
+                                                           .attachHandle(senderHandle)
+                                                           .attachTarget(target)
+                                                           .attach().consumeResponse()
+                                                           .getLatestResponse(Attach.class);
+
+            assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+            assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            interaction.consumeResponse().getLatestResponse(Flow.class);
+
+            assertSendingLinkSucceeds(newTemporaryNodeAddress);
 
             interaction.doCloseConnection();
         }
@@ -247,9 +285,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + " for the application by supplying a terminus capability for the particular Destination"
                           + " type to which the client expects to attach"
                           + "TemporaryTopic Terminus capability : 'temporary-topic'")
-    public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception
+    public void createTemporaryTopicSubscriptionReceivingLink() throws Exception
     {
-        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Source source = new Source();
@@ -270,7 +308,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                                                              .getLatestResponse(Attach.class);
 
             assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
-            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
 
             String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
             assertThat(newTemporaryNodeAddress, is(notNullValue()));
@@ -289,28 +326,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                        .consumeResponse(Attach.class)
                        .consumeResponse(Flow.class);
 
-            String testData = "testData";
-            Disposition responseDisposition = interaction.transferPayloadData(testData)
-                                                         .transferHandle(senderHandle)
-                                                         .transfer()
-                                                         .consumeResponse()
-                                                         .getLatestResponse(Disposition.class);
-            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
-            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
-            assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
-
-            interaction.flowIncomingWindow(UnsignedInteger.ONE)
-                       .flowNextIncomingId(UnsignedInteger.ZERO)
-                       .flowOutgoingWindow(UnsignedInteger.ZERO)
-                       .flowNextOutgoingId(UnsignedInteger.ZERO)
-                       .flowLinkCredit(UnsignedInteger.ONE)
-                       .flowHandle(receiverHandle)
-                       .flow()
-                       .receiveDelivery()
-                       .decodeLatestDelivery();
-
-            assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData)));
-
             interaction.doCloseConnection();
         }
     }
@@ -334,9 +349,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                           + " for the application by supplying a terminus capability for the particular Destination"
                           + " type to which the client expects to attach"
                           + "TemporaryTopic Terminus capability : 'temporary-topic'")
-    public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception
+    public void createTemporaryTopicSubscriptionReceivingLinkFromOtherConnectionDisallowed() throws Exception
     {
-        final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Source source = new Source();
@@ -362,13 +377,53 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
             String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
             assertThat(newTemporaryNodeAddress, is(notNullValue()));
 
-            tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+            assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED);
 
             interaction.doCloseConnection();
         }
     }
 
-    private void tryToConsume(final Source source) throws Exception
+    @Test
+    @SpecificationTest(section = "N/A",
+            description = "JMS 2.0."
+                          + " 6.2.2. Creating temporary destinations"
+                          + "Temporary destinations ( TemporaryQueue or  TemporaryTopic objects) are destinations"
+                          + " that are system - generated uniquely for their connection.  Only their own connection"
+                          + " is allowed to create consumer objects for them.")
+    public void createTemporaryTopicSendingLinkFromOtherConnectionAllowed() throws Exception
+    {
+        final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Source source = new Source();
+            source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+            source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+            source.setCapabilities(capabilities);
+            source.setDynamic(true);
+
+            final Interaction interaction = transport.newInteraction();
+            final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+            final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+                                                             .open().consumeResponse(Open.class)
+                                                             .begin().consumeResponse(Begin.class)
+                                                             .attachRole(Role.RECEIVER)
+                                                             .attachSource(source)
+                                                             .attachHandle(receiverHandle)
+                                                             .attach().consumeResponse()
+                                                             .getLatestResponse(Attach.class);
+
+            assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+            String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+            assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+            assertSendingLinkSucceeds(newTemporaryNodeAddress);
+
+            interaction.doCloseConnection();
+        }
+    }
+
+    private void assertReceivingLinkFails(final Source source, final AmqpError expectedError) throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
@@ -387,7 +442,30 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
                                                      .getLatestResponse(Detach.class);
             assertThat(responseDetach.getClosed(), is(true));
             assertThat(responseDetach.getError(), is(Matchers.notNullValue()));
-            assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+            assertThat(responseDetach.getError().getCondition(), is(equalTo(expectedError)));
+            interaction.doCloseConnection();
+        }
+    }
+
+    private void assertSendingLinkSucceeds(final String address) throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Target target = new Target();
+            target.setAddress(address);
+
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                        .consumeResponse()
+                        .open()
+                        .consumeResponse(Open.class)
+                        .begin()
+                        .consumeResponse(Begin.class)
+                        .attachRole(Role.SENDER)
+                        .attachTarget(target)
+                        .attach()
+                        .consumeResponse(Attach.class)
+                        .consumeResponse(Flow.class);
             interaction.doCloseConnection();
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org