You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/10 16:54:59 UTC
[2/2] qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP
0-8..0-91] Add more queue/basic protocol tests
QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more queue/basic protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/971fba30
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/971fba30
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/971fba30
Branch: refs/heads/master
Commit: 971fba301e50f95f827446ca921c4e37244dadbc
Parents: 89b6ea1
Author: Keith Wall <ke...@gmail.com>
Authored: Thu Dec 7 18:15:18 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Sun Dec 10 16:49:45 2017 +0000
----------------------------------------------------------------------
.../tests/protocol/v0_8/BasicInteraction.java | 24 +++++-
.../qpid/tests/protocol/v0_8/BasicTest.java | 91 ++++++++++++++++++--
.../qpid/tests/protocol/v0_8/QueueTest.java | 88 +++++++++++++++++++
3 files changed, 197 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/971fba30/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index bcac578..d5b4c62 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicGetBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody;
import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock;
@@ -68,6 +69,9 @@ public class BasicInteraction
private String _consumeCancelTag;
private boolean _consumeCancelNoWait;
+ private String _getQueueName;
+ private boolean _getNoAck;
+
public BasicInteraction(final Interaction interaction)
{
_interaction = interaction;
@@ -223,10 +227,28 @@ public class BasicInteraction
return _interaction.sendPerformative(new BasicCancelBody(AMQShortString.valueOf(_consumeCancelTag),
_consumeCancelNoWait));
}
-
public BasicInteraction consumeCancelTag(final String consumeCancelTag)
{
_consumeCancelTag = consumeCancelTag;
return this;
}
+
+ public Interaction get() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicGetBody(0,
+ AMQShortString.valueOf(_getQueueName),
+ _getNoAck));
+ }
+
+ public BasicInteraction getQueueName(final String queueName)
+ {
+ _getQueueName = queueName;
+ return this;
+ }
+
+ public BasicInteraction getNoAck(final boolean noAck)
+ {
+ _getNoAck = noAck;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/971fba30/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
index 1eb0edd..6bcbb84 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/BasicTest.java
@@ -41,6 +41,8 @@ import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.BasicDeliverBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicGetEmptyBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicGetOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicQosOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
@@ -143,11 +145,7 @@ public class BasicTest extends BrokerAdminUsingTestBase
ContentBody content = interaction.consumeResponse(ContentBody.class).getLatestResponse(ContentBody.class);
- QpidByteBuffer payload = content.getPayload();
- byte[] contentData = new byte[payload.remaining()];
- payload.get(contentData);
- payload.dispose();
- String receivedContent = new String(contentData, StandardCharsets.UTF_8);
+ String receivedContent = getContent(content);
assertThat(receivedContent, is(equalTo(messageContent)));
assertThat(getBrokerAdmin().getQueueDepthMessages(queueName), is(equalTo(1)));
@@ -192,4 +190,87 @@ public class BasicTest extends BrokerAdminUsingTestBase
}
}
+ @Test
+ @SpecificationTest(section = "1.8.3.10", description = "direct access to a queue")
+ public void get() throws Exception
+ {
+ String messageContent = "message";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ BasicGetOkBody response = interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).get()
+ .consumeResponse().getLatestResponse(BasicGetOkBody.class);
+
+ long deliveryTag = response.getDeliveryTag();
+ ContentBody content = interaction.consumeResponse(ContentHeaderBody.class)
+ .consumeResponse().getLatestResponse(ContentBody.class);
+
+ String receivedContent = getContent(content);
+ assertThat(receivedContent, is(equalTo(messageContent)));
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+
+ interaction.basic().ackDeliveryTag(deliveryTag).ack()
+ .channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.8.3.10", description = "direct access to a queue")
+ public void getNoAck() throws Exception
+ {
+ String messageContent = "message";
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, messageContent);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).getNoAck(true).get()
+ .consumeResponse(BasicGetOkBody.class);
+
+ ContentBody content = interaction.consumeResponse(ContentHeaderBody.class)
+ .consumeResponse().getLatestResponse(ContentBody.class);
+
+ String receivedContent = getContent(content);
+ assertThat(receivedContent, is(equalTo(messageContent)));
+
+ interaction.channel().close().consumeResponse(ChannelCloseOkBody.class);
+
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.8.3.10", description = "direct access to a queue")
+ public void getEmptyQueue() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic().getQueueName(BrokerAdmin.TEST_QUEUE_NAME).get()
+ .consumeResponse().getLatestResponse(BasicGetEmptyBody.class);
+ }
+ }
+
+ private String getContent(final ContentBody content)
+ {
+ QpidByteBuffer payload = content.getPayload();
+ byte[] contentData = new byte[payload.remaining()];
+ payload.get(contentData);
+ payload.dispose();
+ return new String(contentData, StandardCharsets.UTF_8);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/971fba30/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index 5657c85..9a52a45 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -389,6 +389,28 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
@Test
+ @SpecificationTest(section = "1.7.2.9",
+ description = "The client MUST either specify a queue name or have previously declared a queue on the "
+ + "same channel")
+ public void queueDeleteDefaultQueue() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeleteOkBody deleteResponse = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse(QueueDeclareOkBody.class)
+ .queue().delete()
+ .consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
+ assertThat(deleteResponse.getMessageCount(), is(equalTo(1L)));
+ }
+ }
+
+ @Test
@SpecificationTest(section = "1.7.2.7", description = "purge a queue")
public void queuePurge() throws Exception
{
@@ -416,6 +438,21 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
@Test
+ @SpecificationTest(section = "1.7.2.7", description = "The client MUST NOT attempt to purge a queue that does not exist.")
+ public void queuePurgeQueueNotFound() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ChannelCloseBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().purgeName(BrokerAdmin.TEST_QUEUE_NAME).purge()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ }
+ }
+
+ @Test
@SpecificationTest(section = "1.7.2.3", description = "bind queue to an exchange")
public void queueBind() throws Exception
{
@@ -498,6 +535,34 @@ public class QueueTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "1.7.2.3",
+ description = "The client MUST either specify a queue name or have previously declared a queue on the same channel")
+ public void queueBindDefaultQueue() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ String testExchange = "testExchange";
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse(QueueDeclareOkBody.class)
+ .exchange().declareName(testExchange).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .queue().bindName(testExchange).bind()
+ .consumeResponse(QueueBindOkBody.class);
+
+ ExchangeBoundOkBody response = interaction.exchange()
+ .boundExchangeName(testExchange)
+ .boundQueue(BrokerAdmin.TEST_QUEUE_NAME)
+ .bound()
+ .consumeResponse()
+ .getLatestResponse(ExchangeBoundOkBody.class);
+ assertThat(response.getReplyCode(), is(equalTo(ExchangeBoundOkBody.OK)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.3",
description = "Bindings of durable queues to durable exchanges are automatically durable and the server "
+ "MUST restore such bindings after a server restart.")
public void queueDurableBind() throws Exception
@@ -599,6 +664,29 @@ public class QueueTest extends BrokerAdminUsingTestBase
}
@Test
+ @SpecificationTest(section = "1.7.2.5",
+ description = "The client MUST either specify a queue name or have previously declared a queue on the "
+ + "same channel")
+ public void queueUnbindDefaultQueue() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ String testExchange = "testExchange";
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse(QueueDeclareOkBody.class)
+ .exchange().declareName(testExchange).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .queue().bindName(testExchange).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bindRoutingKey("rk1").bind()
+ .consumeResponse(QueueBindOkBody.class)
+ .queue().unbindName(testExchange).unbindRoutingKey("rk1").unbind()
+ .consumeResponse(QueueUnbindOkBody.class);
+ }
+ }
+
+ @Test
@SpecificationTest(section = "1.7.2.5", description = "The client MUST NOT attempt to unbind a queue that does "
+ "not exist.")
public void queueUnbindUnknownQueue() throws Exception
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org