You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/06 17:47:22 UTC

qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more exchange protocol tests

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 34e38ae1e -> 85c9caae6


QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more exchange protocol tests


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85c9caae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85c9caae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85c9caae

Branch: refs/heads/master
Commit: 85c9caae60565836487e962626896068385bad99
Parents: 34e38ae
Author: Keith Wall <ke...@gmail.com>
Authored: Wed Dec 6 08:53:38 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Wed Dec 6 17:46:32 2017 +0000

----------------------------------------------------------------------
 .../protocol/v0_8/ExchangeInteraction.java      |  19 ++
 .../qpid/tests/protocol/v0_8/ExchangeTest.java  | 261 ++++++++++++++++++-
 2 files changed, 272 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85c9caae/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index 9218480..f62a4b3 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -71,6 +72,24 @@ public class ExchangeInteraction
         return this;
     }
 
+    public ExchangeInteraction declareType(final String type)
+    {
+        _declareType = type;
+        return this;
+    }
+
+    public ExchangeInteraction declareAutoDelete(final boolean autoDelete)
+    {
+        _declareAutoDelete = autoDelete;
+        return this;
+    }
+
+    public ExchangeInteraction declareArguments(final Map<String,Object> args)
+    {
+        _declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args);
+        return this;
+    }
+
     public Interaction declare() throws Exception
     {
         return _interaction.sendPerformative(new ExchangeDeclareBody(0,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85c9caae/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
index 08d4201..0d686dc 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
@@ -20,24 +20,30 @@
  */
 package org.apache.qpid.tests.protocol.v0_8;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
+import java.util.Collections;
 
 import org.hamcrest.Matchers;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
+import org.apache.qpid.server.exchange.ExchangeDefaults;
 import org.apache.qpid.server.protocol.ErrorCodes;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
 import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -64,7 +70,7 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
             interaction.openAnonymousConnection()
                        .channel().open().consumeResponse(ChannelOpenOkBody.class)
                        .exchange().declareName(TEST_EXCHANGE).declare()
-                       .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+                       .consumeResponse(ExchangeDeclareOkBody.class);
 
             ExchangeBoundOkBody response = interaction.exchange()
                                                       .boundExchangeName(TEST_EXCHANGE)
@@ -78,6 +84,129 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "1.6.2.1",
+                       description = "If [passive is] set, the server will reply with Declare-Ok if the exchange "
+                                     + "already exists with the same name, and raise an error if not.  The client can "
+                                     + "use this to check whether an exchange exists without modifying the server state.")
+    public void exchangeDeclarePassive() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .exchange().deleteExchangeName(TEST_EXCHANGE).delete()
+                       .consumeResponse(ExchangeDeleteOkBody.class);
+
+            ChannelCloseBody response = interaction.exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
+                                                        .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.1",
+            description = "Exchange names starting with \"amq.\" are reserved for pre-declared and standardised "
+                          + "exchanges. The client MAY declare an exchange starting with  \"amq.\" if the passive "
+                          + "option is set, or the exchange already exists.")
+    public void exchangeDeclareAmqDisallowed() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declarePassive(true).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .exchange().declarePassive(false).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            ConnectionCloseBody response = interaction.exchange()
+                                                      .declarePassive(false)
+                                                      .declareName("amq.illegal")
+                                                      .declare()
+                               .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+            /* TODO: 0-91 specification requires 'access-refused' (403) but server uses 'not-allowed' (530) */
+            assertThat(response.getReplyCode(), anyOf(equalTo(ErrorCodes.NOT_ALLOWED), equalTo(ErrorCodes.ACCESS_REFUSED)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.1",
+            description = "The client MUST not attempt to redeclare an existing exchange with a different type than "
+                          + "used in the original Exchange.Declare method")
+    public void exchangeRedeclareDifferentType() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            ConnectionCloseBody response = interaction.exchange()
+                                                      .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+                                                      .declareName(TEST_EXCHANGE).declare()
+                                                      .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+        }
+    }
+
+    @Test
+    @Ignore("The server does not implement this rule.")
+    @SpecificationTest(section = "1.6.2.1",
+            description = "When [passive] set, all other method fields [of declare] except name and no-wait are ignored.")
+    public void exchangeRedeclarePassiveDifferentType() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+                       .declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            interaction.exchange()
+                       .declarePassive(true)
+                       .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+                       .declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.1",
+            description = "The client MUST NOT attempt to declare an exchange with a type that the server does not "
+                          + "support.")
+    public void exchangeUnsupportedExchangeType() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+                       .declareName(TEST_EXCHANGE).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            ConnectionCloseBody response = interaction.exchange().declarePassive(true)
+                                                      .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+                                                      .declareName(TEST_EXCHANGE).declare()
+                                                      .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+        }
+    }
+    @Test
+    @SpecificationTest(section = "1.6.2.1",
             description = "If [durable is] set when creating a new exchange, the exchange will be marked as durable. "
                           + "Durable exchanges remain active when a server restarts. Non-durable exchanges (transient "
                           + "exchanges) are purged if/when a server restarts.")
@@ -89,7 +218,7 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
             interaction.openAnonymousConnection()
                        .channel().open().consumeResponse(ChannelOpenOkBody.class)
                        .exchange().declareDurable(true).declareName(TEST_EXCHANGE).declare()
-                       .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+                       .consumeResponse(ExchangeDeclareOkBody.class);
         }
 
         assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
@@ -131,10 +260,8 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
             assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
 
             interaction.exchange()
-                       .deleteExchangeName(TEST_EXCHANGE)
-                       .delete()
-                       .consumeResponse()
-                       .getLatestResponse(ExchangeDeleteOkBody.class);
+                       .deleteExchangeName(TEST_EXCHANGE).delete()
+                       .consumeResponse(ExchangeDeleteOkBody.class);
 
             ExchangeBoundOkBody boundResponse2 = interaction.exchange()
                                                            .boundExchangeName(TEST_EXCHANGE)
@@ -148,6 +275,23 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "1.6.2.3",
+            description = "The client MUST NOT attempt to delete an exchange that does not exist.")
+    public void exchangeDeleteExchangeNotFound() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ChannelCloseBody unknownExchange = interaction.openAnonymousConnection()
+                                                          .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                          .exchange().deleteExchangeName("unknownExchange").delete()
+                                                          .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(unknownExchange.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+            interaction.channel().closeOk();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.3",
             description = "If [if-unused is] set, the server will only delete the exchange if it has no queue"
                           + "bindings. If the exchange has queue bindings the server does not delete it but raises a "
                           + "channel exception instead.")
@@ -189,9 +333,110 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
                        .deleteIfUnused(true)
                        .deleteExchangeName(TEST_EXCHANGE)
                        .delete()
-                       .consumeResponse()
-                       .getLatestResponse(ExchangeDeleteOkBody.class);
+                       .consumeResponse(ExchangeDeleteOkBody.class);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "1.6.2.3",
+            description = "The server MUST, in each virtual host, pre-declare an exchange instance for each standard "
+                          + "exchange type that it implements.")
+    public void exchangeDeleteAmqDisallowed() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ChannelCloseBody response = interaction.openAnonymousConnection()
+                                                   .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                   .exchange()
+                                                   .deleteExchangeName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).delete()
+                                                   .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
         }
     }
 
+    /** Qpid specific extension */
+    @Test
+    public void exchangeDeclareAutoDelete() throws Exception
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange().declareName(TEST_EXCHANGE).declareAutoDelete(true).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .queue().bindName(TEST_EXCHANGE).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bind()
+                       .consumeResponse(QueueBindOkBody.class)
+                       .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+                       .consumeResponse(QueueDeleteOkBody.class);
+
+            ExchangeBoundOkBody boundResponse = interaction.exchange()
+                                                           .boundExchangeName(TEST_EXCHANGE)
+                                                           .bound()
+                                                           .consumeResponse()
+                                                           .getLatestResponse(ExchangeBoundOkBody.class);
+
+            assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.EXCHANGE_NOT_FOUND)));
+        }
+    }
+
+    /** Qpid specific extension */
+    @Test
+    public void exchangeDeclareWithAlternateExchange() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final String altExchName = "altExchange";
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .declareName(altExchName)
+                       .declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class)
+                       .exchange()
+                       .declareName(TEST_EXCHANGE)
+                       .declareArguments(Collections.singletonMap("alternateExchange", altExchName)).declare()
+                       .consumeResponse(ExchangeDeclareOkBody.class);
+
+            ChannelCloseBody inUseResponse = interaction.exchange()
+                                                        .deleteExchangeName(altExchName)
+                                                        .delete()
+                                                        .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+            assertThat(inUseResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+            interaction.channel().closeOk();
+
+            interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .exchange()
+                       .deleteExchangeName(TEST_EXCHANGE)
+                       .delete()
+                       .consumeResponse(ExchangeDeleteOkBody.class)
+                       .exchange()
+                       .deleteExchangeName(altExchName)
+                       .delete()
+                       .consumeResponse(ExchangeDeleteOkBody.class);
+        }
+    }
+
+    /** Qpid specific extension */
+    @Test
+    @Ignore("TODO - server fails if exchange is declared with unknown alternate exchange")
+    public void exchangeDeclareWithUnknownAlternateExchange() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ConnectionCloseBody response = interaction.openAnonymousConnection()
+                                                      .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                      .exchange()
+                                                      .declareName(TEST_EXCHANGE)
+                                                      .declareArguments(Collections.singletonMap("alternateExchange", "notKnown")).declare()
+                                                      .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+            // TODO server fails - jira required
+            assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org