You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/11/28 23:08:00 UTC
qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 1.0] Add
0-8/9/91 queue protocol tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master c401deb7e -> 0a7368ecb
QPID-8038: [Broker-J][AMQP 1.0] Add 0-8/9/91 queue protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0a7368ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0a7368ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0a7368ec
Branch: refs/heads/master
Commit: 0a7368ecb2acbee7e214cb8b6f3acd5f8d0eb31f
Parents: c401deb
Author: Keith Wall <ke...@gmail.com>
Authored: Tue Nov 28 23:04:47 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Tue Nov 28 23:04:47 2017 +0000
----------------------------------------------------------------------
.../tests/protocol/v0_8/BasicInteraction.java | 16 ++
.../tests/protocol/v0_8/ChannelInteraction.java | 6 +
.../tests/protocol/v0_8/QueueInteraction.java | 39 +++++
.../qpid/tests/protocol/v0_8/QueueTest.java | 153 ++++++++++++++++++-
4 files changed, 211 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index bf8c4eb..bcac578 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
@@ -64,6 +65,9 @@ public class BasicInteraction
private long _ackDeliveryTag;
private boolean _ackMultiple;
+ private String _consumeCancelTag;
+ private boolean _consumeCancelNoWait;
+
public BasicInteraction(final Interaction interaction)
{
_interaction = interaction;
@@ -213,4 +217,16 @@ public class BasicInteraction
_ackDeliveryTag = deliveryTag;
return this;
}
+
+ public Interaction cancel() throws Exception
+ {
+ return _interaction.sendPerformative(new BasicCancelBody(AMQShortString.valueOf(_consumeCancelTag),
+ _consumeCancelNoWait));
+ }
+
+ public BasicInteraction consumeCancelTag(final String consumeCancelTag)
+ {
+ _consumeCancelTag = consumeCancelTag;
+ return this;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
index 51d4426..a5dcc17 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
@@ -22,6 +22,7 @@ package org.apache.qpid.tests.protocol.v0_8;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody;
@@ -44,6 +45,11 @@ public class ChannelInteraction
return _interaction.sendPerformative(new ChannelCloseBody(200, AMQShortString.valueOf(""), 0, 0));
}
+ public Interaction closeOk() throws Exception
+ {
+ return _interaction.sendPerformative(ChannelCloseOkBody.INSTANCE);
+ }
+
public Interaction flow(final boolean active) throws Exception
{
return _interaction.sendPerformative(new ChannelFlowBody(active));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
index 6e86385..1f6d47f 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -26,6 +26,7 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
import org.apache.qpid.server.protocol.v0_8.FieldTable;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteBody;
public class QueueInteraction
{
@@ -38,6 +39,11 @@ public class QueueInteraction
private boolean _declareNowait;
private Map<String, Object> _declareArguments = new HashMap<>();
+ private String _deleteName;
+ private boolean _deleteIfUnused;
+ private boolean _deleteIfEmpty;
+ private boolean _deleteNowait;
+
public QueueInteraction(final Interaction interaction)
{
_interaction = interaction;
@@ -49,6 +55,18 @@ public class QueueInteraction
return this;
}
+ public QueueInteraction declarePassive(final boolean declarePassive)
+ {
+ _declarePassive = declarePassive;
+ return this;
+ }
+
+ public QueueInteraction declareDurable(final boolean declareDurable)
+ {
+ _declareDurable = declareDurable;
+ return this;
+ }
+
public Interaction declare() throws Exception
{
return _interaction.sendPerformative(new QueueDeclareBody(0,
@@ -60,4 +78,25 @@ public class QueueInteraction
_declareNowait,
FieldTable.convertToFieldTable(_declareArguments)));
}
+
+ public QueueInteraction deleteName(final String name)
+ {
+ _deleteName = name;
+ return this;
+ }
+
+ public QueueInteraction deleteIfUnused(final boolean deleteIfUnused)
+ {
+ _deleteIfUnused = deleteIfUnused;
+ return this;
+ }
+
+ public Interaction delete() throws Exception
+ {
+ return _interaction.sendPerformative(new QueueDeleteBody(0,
+ AMQShortString.valueOf(_deleteName),
+ _deleteIfUnused,
+ _deleteIfEmpty,
+ _deleteNowait));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index e54dad3b..a4c43ef 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -23,15 +23,22 @@ package org.apache.qpid.tests.protocol.v0_8;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
+import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -53,15 +60,155 @@ public class QueueTest extends BrokerAdminUsingTestBase
try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- final String queueName = "testQueue";
QueueDeclareOkBody response = interaction.openAnonymousConnection()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
- .queue().declareName(queueName).declare()
+ .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
.consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
- assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(queueName))));
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
assertThat(response.getMessageCount(), is(equalTo(0L)));
assertThat(response.getConsumerCount(), is(equalTo(0L)));
}
}
+
+ @Test
+ @SpecificationTest(section = "1.7.2.1",
+ description = "If [declarePassive is] set, the server will reply with Declare-Ok if the queue already exists"
+ + "with the same name, and raise an error if not.")
+ public void queueDeclarePassive() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+ assertThat(response.getMessageCount(), is(equalTo(0L)));
+ assertThat(response.getConsumerCount(), is(equalTo(0L)));
+
+ getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ ChannelCloseBody closeResponse = interaction.queue()
+ .deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(closeResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.1",
+ description = "If [declareDurable is] set when creating a new queue, the queue will be marked as durable."
+ + "Durable queues remain active when a server restarts.")
+ public void queueDeclareDurable() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declareDurable(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+ assertThat(response.getMessageCount(), is(equalTo(0L)));
+ assertThat(response.getConsumerCount(), is(equalTo(0L)));
+ }
+
+ assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
+ getBrokerAdmin().restart();
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeclareOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+ .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+ assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+ assertThat(response.getMessageCount(), is(equalTo(0L)));
+ assertThat(response.getConsumerCount(), is(equalTo(0L)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.9", description = "delete a queue")
+ public void queueDelete() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ QueueDeleteOkBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+ .consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
+
+ assertThat(response.getMessageCount(), is(equalTo(1L)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.9",
+ description = "The client MUST NOT attempt to delete a queue that does not exist.")
+ public void queueDeleteQueueNotFound() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ChannelCloseBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.7.2.9",
+ description = "If [deleteIfUnused is] set, the server will only delete the queue if it has no consumers. "
+ + "If the queue has consumers the server does does not delete it but raises a channel "
+ + "exception instead..")
+ public void queueDeleteWithConsumer() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ try(FrameTransport consumerTransport = new FrameTransport(_brokerAddress).connect();
+ FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final String consumerTag = "A";
+ final Interaction consumerInteraction = consumerTransport.newInteraction();
+ final BasicInteraction basicInteraction = consumerInteraction.openAnonymousConnection()
+ .channel()
+ .open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic();
+ basicInteraction.consumeConsumerTag(consumerTag).consumeQueue(BrokerAdmin.TEST_QUEUE_NAME).consume()
+ .consumeResponse(BasicConsumeOkBody.class);
+
+ final Interaction deleterInteraction = transport.newInteraction();
+ ChannelCloseBody response = deleterInteraction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).deleteIfUnused(true).delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.IN_USE)));
+ deleterInteraction.channel().closeOk();
+
+ basicInteraction.consumeCancelTag(consumerTag).cancel()
+ .consumeResponse().getLatestResponse(BasicCancelOkBody.class);
+
+ deleterInteraction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+ .consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org