You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/04/23 12:22:21 UTC
[2/2] qpid-broker-j git commit: QPID-8164: Add tests ensuring that
sending links to temporary queues/topics from other connections are allowed.
QPID-8164: Add tests ensuring that sending links to temporary queues/topics from other connections are allowed.
(cherry picked from commit 4dcbb741b4e89f109777777154b4f74632f76683)
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/bc13e07a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/bc13e07a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/bc13e07a
Branch: refs/heads/7.0.x
Commit: bc13e07a4fe398749298f42726c9d03b5f38a77f
Parents: 73d4350
Author: Keith Wall <kw...@apache.org>
Authored: Mon Apr 23 09:23:38 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Mon Apr 23 11:54:51 2018 +0100
----------------------------------------------------------------------
.../bindmapjms/TemporaryDestinationTest.java | 160 ++++++++++++++-----
1 file changed, 119 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/bc13e07a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
index 4160336..1dd63ea 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/bindmapjms/TemporaryDestinationTest.java
@@ -20,7 +20,6 @@
package org.apache.qpid.tests.protocol.v1_0.extensions.bindmapjms;
-import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -36,7 +35,6 @@ import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.Session_1_0;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnClose;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
@@ -45,7 +43,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -58,6 +55,8 @@ import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
{
+ private static final Symbol TEMPORARY_QUEUE = Symbol.valueOf("temporary-queue");
+ private static final Symbol TEMPORARY_TOPIC = Symbol.valueOf("temporary-topic");
private InetSocketAddress _brokerAddress;
@Before
@@ -73,7 +72,7 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.")
public void deleteOnCloseWithConnectionCloseForQueue() throws Exception
{
- deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-queue")});
+ deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_QUEUE});
}
@Test
@@ -83,12 +82,12 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ "dynamic-node-properties field of target containing the “lifetime-policy” symbol key mapped to delete-on-close.")
public void deleteOnCloseWithConnectionCloseForTopic() throws Exception
{
- deleteOnCloseWithConnectionClose(new Symbol[]{Symbol.valueOf("temporary-topic")});
+ deleteOnCloseWithConnectionClose(new Symbol[]{TEMPORARY_TOPIC});
}
private void deleteOnCloseWithConnectionClose(final Symbol[] targetCapabilities) throws Exception
{
- String newTemporaryNodeAddress = null;
+ String newTemporaryNodeAddress;
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -136,9 +135,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ " for the application by supplying a terminus capability for the particular Destination"
+ " type to which the client expects to attach [...]"
+ "TemporaryQueue Terminus capability : 'temporary-queue'")
- public void canConsumeFormTemporaryQueueCreatedOnTheSameConnection() throws Exception
+ public void createTemporaryQueueReceivingLink() throws Exception
{
- final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Target target = createTarget(capabilities);
@@ -196,9 +195,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ " for the application by supplying a terminus capability for the particular Destination"
+ " type to which the client expects to attach [...]"
+ "TemporaryQueue Terminus capability : 'temporary-queue'")
- public void canNotConsumeFormTemporaryQueueCreatedOnOtherConnection() throws Exception
+ public void createTemporaryQueueReceivingLinkFromOtherConnectionDisallowed() throws Exception
{
- final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-queue")};
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Target target = createTarget(capabilities);
@@ -222,7 +221,46 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
interaction.consumeResponse().getLatestResponse(Flow.class);
- tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+ assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED);
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them.")
+ public void createTemporaryQueueSendingLinkFromOtherConnectionAllowed() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_QUEUE};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Target target = createTarget(capabilities);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger senderHandle = UnsignedInteger.ONE;
+ final Attach senderAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachHandle(senderHandle)
+ .attachTarget(target)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(senderAttachResponse.getSource(), is(notNullValue()));
+ assertThat(senderAttachResponse.getTarget(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Target) senderAttachResponse.getTarget()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ interaction.consumeResponse().getLatestResponse(Flow.class);
+
+ assertSendingLinkSucceeds(newTemporaryNodeAddress);
interaction.doCloseConnection();
}
@@ -247,9 +285,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ " for the application by supplying a terminus capability for the particular Destination"
+ " type to which the client expects to attach"
+ "TemporaryTopic Terminus capability : 'temporary-topic'")
- public void canConsumeFormTemporaryTopicCreatedOnTheSameConnection() throws Exception
+ public void createTemporaryTopicSubscriptionReceivingLink() throws Exception
{
- final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Source source = new Source();
@@ -270,7 +308,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
.getLatestResponse(Attach.class);
assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
- assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
assertThat(newTemporaryNodeAddress, is(notNullValue()));
@@ -289,28 +326,6 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
.consumeResponse(Attach.class)
.consumeResponse(Flow.class);
- String testData = "testData";
- Disposition responseDisposition = interaction.transferPayloadData(testData)
- .transferHandle(senderHandle)
- .transfer()
- .consumeResponse()
- .getLatestResponse(Disposition.class);
- assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
- assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
- assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
-
- interaction.flowIncomingWindow(UnsignedInteger.ONE)
- .flowNextIncomingId(UnsignedInteger.ZERO)
- .flowOutgoingWindow(UnsignedInteger.ZERO)
- .flowNextOutgoingId(UnsignedInteger.ZERO)
- .flowLinkCredit(UnsignedInteger.ONE)
- .flowHandle(receiverHandle)
- .flow()
- .receiveDelivery()
- .decodeLatestDelivery();
-
- assertThat(interaction.getDecodedLatestDelivery(), is(equalTo(testData)));
-
interaction.doCloseConnection();
}
}
@@ -334,9 +349,9 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
+ " for the application by supplying a terminus capability for the particular Destination"
+ " type to which the client expects to attach"
+ "TemporaryTopic Terminus capability : 'temporary-topic'")
- public void canNotConsumeFormTemporaryTopicCreatedOnOtherConnection() throws Exception
+ public void createTemporaryTopicSubscriptionReceivingLinkFromOtherConnectionDisallowed() throws Exception
{
- final Symbol[] capabilities = new Symbol[]{Symbol.valueOf("temporary-topic")};
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Source source = new Source();
@@ -362,13 +377,53 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
assertThat(newTemporaryNodeAddress, is(notNullValue()));
- tryToConsume(createSource(newTemporaryNodeAddress, capabilities));
+ assertReceivingLinkFails(createSource(newTemporaryNodeAddress, capabilities), AmqpError.RESOURCE_LOCKED);
interaction.doCloseConnection();
}
}
- private void tryToConsume(final Source source) throws Exception
+ @Test
+ @SpecificationTest(section = "N/A",
+ description = "JMS 2.0."
+ + " 6.2.2. Creating temporary destinations"
+ + "Temporary destinations ( TemporaryQueue or TemporaryTopic objects) are destinations"
+ + " that are system - generated uniquely for their connection. Only their own connection"
+ + " is allowed to create consumer objects for them.")
+ public void createTemporaryTopicSendingLinkFromOtherConnectionAllowed() throws Exception
+ {
+ final Symbol[] capabilities = new Symbol[]{TEMPORARY_TOPIC};
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Source source = new Source();
+ source.setDynamicNodeProperties(Collections.singletonMap(Session_1_0.LIFETIME_POLICY, new DeleteOnClose()));
+ source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
+ source.setCapabilities(capabilities);
+ source.setDynamic(true);
+
+ final Interaction interaction = transport.newInteraction();
+ final UnsignedInteger receiverHandle = UnsignedInteger.ONE;
+ final Attach receiverAttachResponse = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachSource(source)
+ .attachHandle(receiverHandle)
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ assertThat(receiverAttachResponse.getSource(), is(notNullValue()));
+
+ String newTemporaryNodeAddress = ((Source) receiverAttachResponse.getSource()).getAddress();
+ assertThat(newTemporaryNodeAddress, is(notNullValue()));
+
+ assertSendingLinkSucceeds(newTemporaryNodeAddress);
+
+ interaction.doCloseConnection();
+ }
+ }
+
+ private void assertReceivingLinkFails(final Source source, final AmqpError expectedError) throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -387,7 +442,30 @@ public class TemporaryDestinationTest extends BrokerAdminUsingTestBase
.getLatestResponse(Detach.class);
assertThat(responseDetach.getClosed(), is(true));
assertThat(responseDetach.getError(), is(Matchers.notNullValue()));
- assertThat(responseDetach.getError().getCondition(), is(equalTo(AmqpError.RESOURCE_LOCKED)));
+ assertThat(responseDetach.getError().getCondition(), is(equalTo(expectedError)));
+ interaction.doCloseConnection();
+ }
+ }
+
+ private void assertSendingLinkSucceeds(final String address) throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Target target = new Target();
+ target.setAddress(address);
+
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTarget(target)
+ .attach()
+ .consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
interaction.doCloseConnection();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org