You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/06 17:47:22 UTC
qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add
more exchange protocol tests
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 34e38ae1e -> 85c9caae6
QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add more exchange protocol tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/85c9caae
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/85c9caae
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/85c9caae
Branch: refs/heads/master
Commit: 85c9caae60565836487e962626896068385bad99
Parents: 34e38ae
Author: Keith Wall <ke...@gmail.com>
Authored: Wed Dec 6 08:53:38 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Wed Dec 6 17:46:32 2017 +0000
----------------------------------------------------------------------
.../protocol/v0_8/ExchangeInteraction.java | 19 ++
.../qpid/tests/protocol/v0_8/ExchangeTest.java | 261 ++++++++++++++++++-
2 files changed, 272 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85c9caae/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
index 9218480..f62a4b3 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/ExchangeInteraction.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -71,6 +72,24 @@ public class ExchangeInteraction
return this;
}
+ public ExchangeInteraction declareType(final String type)
+ {
+ _declareType = type;
+ return this;
+ }
+
+ public ExchangeInteraction declareAutoDelete(final boolean autoDelete)
+ {
+ _declareAutoDelete = autoDelete;
+ return this;
+ }
+
+ public ExchangeInteraction declareArguments(final Map<String,Object> args)
+ {
+ _declareArguments = args == null ? Collections.emptyMap() : new HashMap<>(args);
+ return this;
+ }
+
public Interaction declare() throws Exception
{
return _interaction.sendPerformative(new ExchangeDeclareBody(0,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/85c9caae/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
index 08d4201..0d686dc 100644
--- a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/ExchangeTest.java
@@ -20,24 +20,30 @@
*/
package org.apache.qpid.tests.protocol.v0_8;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
+import java.util.Collections;
import org.hamcrest.Matchers;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
+import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.qpid.server.protocol.ErrorCodes;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeBoundOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeclareOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ExchangeDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueBindOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueUnbindOkBody;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -64,7 +70,7 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
interaction.openAnonymousConnection()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareName(TEST_EXCHANGE).declare()
- .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+ .consumeResponse(ExchangeDeclareOkBody.class);
ExchangeBoundOkBody response = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
@@ -78,6 +84,129 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "1.6.2.1",
+ description = "If [passive is] set, the server will reply with Declare-Ok if the exchange "
+ + "already exists with the same name, and raise an error if not. The client can "
+ + "use this to check whether an exchange exists without modifying the server state.")
+ public void exchangeDeclarePassive() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .exchange().deleteExchangeName(TEST_EXCHANGE).delete()
+ .consumeResponse(ExchangeDeleteOkBody.class);
+
+ ChannelCloseBody response = interaction.exchange().declarePassive(true).declareName(TEST_EXCHANGE).declare()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.6.2.1",
+ description = "Exchange names starting with \"amq.\" are reserved for pre-declared and standardised "
+ + "exchanges. The client MAY declare an exchange starting with \"amq.\" if the passive "
+ + "option is set, or the exchange already exists.")
+ public void exchangeDeclareAmqDisallowed() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().declarePassive(true).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .exchange().declarePassive(false).declareName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ ConnectionCloseBody response = interaction.exchange()
+ .declarePassive(false)
+ .declareName("amq.illegal")
+ .declare()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+ /* TODO: 0-91 specification requires 'access-refused' (403) but server uses 'not-allowed' (530) */
+ assertThat(response.getReplyCode(), anyOf(equalTo(ErrorCodes.NOT_ALLOWED), equalTo(ErrorCodes.ACCESS_REFUSED)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.6.2.1",
+ description = "The client MUST not attempt to redeclare an existing exchange with a different type than "
+ + "used in the original Exchange.Declare method")
+ public void exchangeRedeclareDifferentType() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ ConnectionCloseBody response = interaction.exchange()
+ .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+ .declareName(TEST_EXCHANGE).declare()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+ }
+ }
+
+ @Test
+ @Ignore("The server does not implement this rule.")
+ @SpecificationTest(section = "1.6.2.1",
+ description = "When [passive] set, all other method fields [of declare] except name and no-wait are ignored.")
+ public void exchangeRedeclarePassiveDifferentType() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ .declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ interaction.exchange()
+ .declarePassive(true)
+ .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+ .declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.6.2.1",
+ description = "The client MUST NOT attempt to declare an exchange with a type that the server does not "
+ + "support.")
+ public void exchangeUnsupportedExchangeType() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .declareType(ExchangeDefaults.DIRECT_EXCHANGE_CLASS)
+ .declareName(TEST_EXCHANGE).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ ConnectionCloseBody response = interaction.exchange().declarePassive(true)
+ .declareType(ExchangeDefaults.TOPIC_EXCHANGE_CLASS)
+ .declareName(TEST_EXCHANGE).declare()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+ }
+ }
+ @Test
+ @SpecificationTest(section = "1.6.2.1",
description = "If [durable is] set when creating a new exchange, the exchange will be marked as durable. "
+ "Durable exchanges remain active when a server restarts. Non-durable exchanges (transient "
+ "exchanges) are purged if/when a server restarts.")
@@ -89,7 +218,7 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
interaction.openAnonymousConnection()
.channel().open().consumeResponse(ChannelOpenOkBody.class)
.exchange().declareDurable(true).declareName(TEST_EXCHANGE).declare()
- .consumeResponse().getLatestResponse(ExchangeDeclareOkBody.class);
+ .consumeResponse(ExchangeDeclareOkBody.class);
}
assumeThat(getBrokerAdmin().supportsRestart(), Matchers.is(true));
@@ -131,10 +260,8 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.NO_BINDINGS)));
interaction.exchange()
- .deleteExchangeName(TEST_EXCHANGE)
- .delete()
- .consumeResponse()
- .getLatestResponse(ExchangeDeleteOkBody.class);
+ .deleteExchangeName(TEST_EXCHANGE).delete()
+ .consumeResponse(ExchangeDeleteOkBody.class);
ExchangeBoundOkBody boundResponse2 = interaction.exchange()
.boundExchangeName(TEST_EXCHANGE)
@@ -148,6 +275,23 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "1.6.2.3",
+ description = "The client MUST NOT attempt to delete an exchange that does not exist.")
+ public void exchangeDeleteExchangeNotFound() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ChannelCloseBody unknownExchange = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().deleteExchangeName("unknownExchange").delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(unknownExchange.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ interaction.channel().closeOk();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.6.2.3",
description = "If [if-unused is] set, the server will only delete the exchange if it has no queue"
+ "bindings. If the exchange has queue bindings the server does not delete it but raises a "
+ "channel exception instead.")
@@ -189,9 +333,110 @@ public class ExchangeTest extends BrokerAdminUsingTestBase
.deleteIfUnused(true)
.deleteExchangeName(TEST_EXCHANGE)
.delete()
- .consumeResponse()
- .getLatestResponse(ExchangeDeleteOkBody.class);
+ .consumeResponse(ExchangeDeleteOkBody.class);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "1.6.2.3",
+ description = "The server MUST, in each virtual host, pre-declare an exchange instance for each standard "
+ + "exchange type that it implements.")
+ public void exchangeDeleteAmqDisallowed() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ChannelCloseBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .deleteExchangeName(ExchangeDefaults.DIRECT_EXCHANGE_NAME).delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
}
}
+ /** Qpid specific extension */
+ @Test
+ public void exchangeDeclareAutoDelete() throws Exception
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange().declareName(TEST_EXCHANGE).declareAutoDelete(true).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .queue().bindName(TEST_EXCHANGE).bindQueueName(BrokerAdmin.TEST_QUEUE_NAME).bind()
+ .consumeResponse(QueueBindOkBody.class)
+ .queue().deleteName(BrokerAdmin.TEST_QUEUE_NAME).delete()
+ .consumeResponse(QueueDeleteOkBody.class);
+
+ ExchangeBoundOkBody boundResponse = interaction.exchange()
+ .boundExchangeName(TEST_EXCHANGE)
+ .bound()
+ .consumeResponse()
+ .getLatestResponse(ExchangeBoundOkBody.class);
+
+ assertThat(boundResponse.getReplyCode(), is(equalTo(ExchangeBoundOkBody.EXCHANGE_NOT_FOUND)));
+ }
+ }
+
+ /** Qpid specific extension */
+ @Test
+ public void exchangeDeclareWithAlternateExchange() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final String altExchName = "altExchange";
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .declareName(altExchName)
+ .declare()
+ .consumeResponse(ExchangeDeclareOkBody.class)
+ .exchange()
+ .declareName(TEST_EXCHANGE)
+ .declareArguments(Collections.singletonMap("alternateExchange", altExchName)).declare()
+ .consumeResponse(ExchangeDeclareOkBody.class);
+
+ ChannelCloseBody inUseResponse = interaction.exchange()
+ .deleteExchangeName(altExchName)
+ .delete()
+ .consumeResponse().getLatestResponse(ChannelCloseBody.class);
+ assertThat(inUseResponse.getReplyCode(), is(equalTo(ErrorCodes.NOT_ALLOWED)));
+ interaction.channel().closeOk();
+
+ interaction.channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .deleteExchangeName(TEST_EXCHANGE)
+ .delete()
+ .consumeResponse(ExchangeDeleteOkBody.class)
+ .exchange()
+ .deleteExchangeName(altExchName)
+ .delete()
+ .consumeResponse(ExchangeDeleteOkBody.class);
+ }
+ }
+
+ /** Qpid specific extension */
+ @Test
+ @Ignore("TODO - server fails if exchange is declared with unknown alternate exchange")
+ public void exchangeDeclareWithUnknownAlternateExchange() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ConnectionCloseBody response = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .exchange()
+ .declareName(TEST_EXCHANGE)
+ .declareArguments(Collections.singletonMap("alternateExchange", "notKnown")).declare()
+ .consumeResponse().getLatestResponse(ConnectionCloseBody.class);
+ // TODO server fails - jira required
+ assertThat(response.getReplyCode(), is(equalTo(ErrorCodes.NOT_FOUND)));
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org