You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/11/28 23:08:00 UTC

qpid-broker-j git commit: QPID-8038: [Broker-J][AMQP 1.0] Add 0-8/9/91 queue protocol tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master c401deb7e -> 0a7368ecb


QPID-8038: [Broker-J][AMQP 1.0] Add 0-8/9/91 queue protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0a7368ec
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0a7368ec
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0a7368ec

Branch: refs/heads/master
Commit: 0a7368ecb2acbee7e214cb8b6f3acd5f8d0eb31f
Parents: c401deb
Author: Keith Wall <ke...@gmail.com>
Authored: Tue Nov 28 23:04:47 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Tue Nov 28 23:04:47 2017 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_8/BasicInteraction.java   |  16 ++
 .../tests/protocol/v0_8/ChannelInteraction.java |   6 +
 .../tests/protocol/v0_8/QueueInteraction.java   |  39 +++++
 .../qpid/tests/protocol/v0_8/QueueTest.java     | 153 ++++++++++++++++++-
 4 files changed, 211 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index bf8c4eb..bcac578 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -31,6 +31,7 @@ import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelBody;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeBody;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
@@ -64,6 +65,9 @@ public class BasicInteraction
     private long _ackDeliveryTag;
     private boolean _ackMultiple;
 
+    private String _consumeCancelTag;
+    private boolean _consumeCancelNoWait;
+
     public BasicInteraction(final Interaction interaction)
     {
         _interaction = interaction;
@@ -213,4 +217,16 @@ public class BasicInteraction
         _ackDeliveryTag = deliveryTag;
         return this;
     }
+
+    public Interaction cancel() throws Exception
+    {
+        return _interaction.sendPerformative(new BasicCancelBody(AMQShortString.valueOf(_consumeCancelTag),
+                                                                 _consumeCancelNoWait));
+    }
+
+    public BasicInteraction consumeCancelTag(final String consumeCancelTag)
+    {
+        _consumeCancelTag = consumeCancelTag;
+        return this;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
index 51d4426..a5dcc17 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ChannelInteraction.java
@@ -22,6 +22,7 @@ package org.apache.qpid.tests.protocol.v0_8;
 
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenBody;
 
@@ -44,6 +45,11 @@ public class ChannelInteraction
         return _interaction.sendPerformative(new ChannelCloseBody(200, AMQShortString.valueOf(""), 0, 0));
     }
 
+    public Interaction closeOk() throws Exception
+    {
+        return _interaction.sendPerformative(ChannelCloseOkBody.INSTANCE);
+    }
+
     public Interaction flow(final boolean active) throws Exception
     {
         return _interaction.sendPerformative(new ChannelFlowBody(active));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
index 6e86385..1f6d47f 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/QueueInteraction.java
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
 import org.apache.qpid.server.protocol.v0_8.FieldTable;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteBody;
 
 public class QueueInteraction
 {
@@ -38,6 +39,11 @@ public class QueueInteraction
     private boolean _declareNowait;
     private Map<String, Object> _declareArguments = new HashMap<>();
 
+    private String _deleteName;
+    private boolean _deleteIfUnused;
+    private boolean _deleteIfEmpty;
+    private boolean _deleteNowait;
+
     public QueueInteraction(final Interaction interaction)
     {
         _interaction = interaction;
@@ -49,6 +55,18 @@ public class QueueInteraction
         return this;
     }
 
+    public QueueInteraction declarePassive(final boolean declarePassive)
+    {
+        _declarePassive = declarePassive;
+        return this;
+    }
+
+    public QueueInteraction declareDurable(final boolean declareDurable)
+    {
+        _declareDurable = declareDurable;
+        return this;
+    }
+
     public Interaction declare() throws Exception
     {
         return _interaction.sendPerformative(new QueueDeclareBody(0,
@@ -60,4 +78,25 @@ public class QueueInteraction
                                                                   _declareNowait,
                                                                   FieldTable.convertToFieldTable(_declareArguments)));
     }
+
+    public QueueInteraction deleteName(final String name)
+    {
+        _deleteName = name;
+        return this;
+    }
+
+    public QueueInteraction deleteIfUnused(final boolean deleteIfUnused)
+    {
+        _deleteIfUnused = deleteIfUnused;
+        return this;
+    }
+
+    public Interaction delete() throws Exception
+    {
+        return _interaction.sendPerformative(new QueueDeleteBody(0,
+                                                                 AMQShortString.valueOf(_deleteName),
+                                                                 _deleteIfUnused,
+                                                                 _deleteIfEmpty,
+                                                                 _deleteNowait));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0a7368ec/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
index e54dad3b..a4c43ef 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/QueueTest.java
@@ -23,15 +23,22 @@ package org.apache.qpid.tests.protocol.v0_8;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
 
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.v0_8.AMQShortString;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicConsumeOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -53,15 +60,155 @@ public class QueueTest extends BrokerAdminUsingTestBase
         try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
-            final String queueName = "testQueue";
             QueueDeclareOkBody response = interaction.openAnonymousConnection()
                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
-                                                     .queue().declareName(queueName).declare()
+                                                     .queue().declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
                                                      .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
 
-            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(queueName))));
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
             assertThat(response.getMessageCount(), is(equalTo(0L)));
             assertThat(response.getConsumerCount(), is(equalTo(0L)));
         }
     }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.1",
+            description = "If [declarePassive is] set, the server will reply with Declare-Ok if the queue already exists"
+                          + "with the same name, and raise an error if not.")
+    public void queueDeclarePassive() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            QueueDeclareOkBody response = interaction.openAnonymousConnection()
+                                                     .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                     .queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+                                                     .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+            assertThat(response.getMessageCount(), is(equalTo(0L)));
+            assertThat(response.getConsumerCount(), is(equalTo(0L)));
+
+            getBrokerAdmin().deleteQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+            ChannelCloseBody closeResponse = interaction.queue()
+                                                        .deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+                                                        .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(closeResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.1",
+            description = "If [declareDurable is] set when creating a new queue, the queue will be marked as durable."
+                          + "Durable queues remain active when a server restarts.")
+    public void queueDeclareDurable() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            QueueDeclareOkBody response = interaction.openAnonymousConnection()
+                                                     .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                     .queue().declareDurable(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+                                                     .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+            assertThat(response.getMessageCount(), is(equalTo(0L)));
+            assertThat(response.getConsumerCount(), is(equalTo(0L)));
+        }
+
+        assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
+        getBrokerAdmin().restart();
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            QueueDeclareOkBody response = interaction.openAnonymousConnection()
+                                                     .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                     .queue().declarePassive(true).declareName(BrokerAdmin.TEST_QUEUE_NAME).declare()
+                                                     .consumeResponse().getLatestResponse(QueueDeclareOkBody.class);
+
+            assertThat(response.getQueue(), is(equalTo(AMQShortString.valueOf(BrokerAdmin.TEST_QUEUE_NAME))));
+            assertThat(response.getMessageCount(), is(equalTo(0L)));
+            assertThat(response.getConsumerCount(), is(equalTo(0L)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.9", description = "delete a queue")
+    public void queueDelete() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "message");
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            QueueDeleteOkBody response = interaction.openAnonymousConnection()
+                                                    .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                    .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+                                                    .consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
+
+            assertThat(response.getMessageCount(), is(equalTo(1L)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.9",
+                       description = "The client MUST NOT attempt to delete a queue that does not exist.")
+    public void queueDeleteQueueNotFound() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                   .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+                                                   .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.7.2.9",
+            description = "If [deleteIfUnused is] set, the server will only delete the queue if it has no consumers. "
+                          + "If the queue has consumers the server does does not delete it but raises a channel "
+                          + "exception instead..")
+    public void queueDeleteWithConsumer() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport consumerTransport = new FrameTransport(_brokerAddress).connect();
+            FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final String consumerTag = "A";
+            final Interaction consumerInteraction = consumerTransport.newInteraction();
+            final BasicInteraction basicInteraction = consumerInteraction.openAnonymousConnection()
+                                                                         .channel()
+                                                                         .open()
+                                                                         .consumeResponse(ChannelOpenOkBody.class)
+                                                                         .basic();
+            basicInteraction.consumeConsumerTag(consumerTag).consumeQueue(BrokerAdmin.TEST_QUEUE_NAME).consume()
+                            .consumeResponse(BasicConsumeOkBody.class);
+
+            final Interaction deleterInteraction = transport.newInteraction();
+            ChannelCloseBody response = deleterInteraction.openAnonymousConnection()
+                                                          .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                          .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).deleteIfUnused(true).delete()
+                                                          .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.IN_USE)));
+            deleterInteraction.channel().closeOk();
+
+            basicInteraction.consumeCancelTag(consumerTag).cancel()
+                            .consumeResponse().getLatestResponse(BasicCancelOkBody.class);
+
+            deleterInteraction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+                              .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+                              .consumeResponse().getLatestResponse(QueueDeleteOkBody.class);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org