You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/10 22:51:50 UTC
qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add
publisher confirms tests (RabbitMQ extension to AMQP 0-91
Repository: qpid-broker-j
Updated Branches:
refs/heads/master f0c04045a -> 79aecbce0
QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add publisher confirms tests (RabbitMQ extension to AMQP 0-91
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/79aecbce
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/79aecbce
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/79aecbce
Branch: refs/heads/master
Commit: 79aecbce08c208cbaa61ba61fc0fe4abb61cd72b
Parents: f0c0404
Author: Keith Wall <ke...@gmail.com>
Authored: Sun Dec 10 22:50:38 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Sun Dec 10 22:50:38 2017 +0000
----------------------------------------------------------------------
.../tests/protocol/v0_8/BasicInteraction.java | 8 +
.../tests/protocol/v0_8/FrameTransport.java | 2 +-
.../confirms/PublisherConfirmsTest.java | 154 +++++++++++++++++++
3 files changed, 163 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 7f91d8d..35ff42a 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicGetBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody;
import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
@@ -72,6 +73,8 @@ public class BasicInteraction
private String _getQueueName;
private boolean _getNoAck;
+ private boolean _confirmSelectNoWait;
+
public BasicInteraction(final Interaction interaction)
{
_interaction = interaction;
@@ -270,4 +273,9 @@ public class BasicInteraction
_getNoAck = noAck;
return this;
}
+
+ public Interaction confirmSelect() throws Exception
+ {
+ return _interaction.sendPerformative(new ConfirmSelectBody(_confirmSelectNoWait));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index 0b5c4e4..432eab8 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -34,7 +34,7 @@ public class FrameTransport extends AbstractFrameTransport<Interaction>
private final byte[] _protocolHeader;
private ProtocolVersion _protocolVersion;
- FrameTransport(final InetSocketAddress brokerAddress)
+ public FrameTransport(final InetSocketAddress brokerAddress)
{
this(brokerAddress, Protocol.AMQP_0_9_1);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
new file mode 100644
index 0000000..be2d444
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.confirms;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicNackBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+/**
+ * The specification for publisher confirms is: https://www.rabbitmq.com/confirms.html
+ */
+public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
+{
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ }
+
+ @Test
+ @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+ description = "Once a channel is in confirm mode, both the broker and the client count messages "
+ + "(counting starts at 1 on the first confirm.select)The broker then confirms messages as "
+ + "it handles them by sending a basic.ack on the same channel. The delivery-tag field "
+ + "contains the sequence number of the confirmed message." )
+ public void publishMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ BasicAckBody ackBody = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+ .content("Test")
+ .publishMessage()
+ .consumeResponse().getLatestResponse(BasicAckBody.class);
+
+ assertThat(ackBody.getDeliveryTag(), is(equalTo(1L)));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+ description = "[...] when the broker is unable to handle messages successfully, instead of a basic.ack,"
+ + "the broker will send a basic.nack.")
+ public void publishUnrouteableMandatoryMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ BasicNackBody nackBody = interaction.openAnonymousConnection()
+ .channel()
+ .open()
+ .consumeResponse(ChannelOpenOkBody.class)
+ .basic()
+ .confirmSelect()
+ .consumeResponse(ConfirmSelectOkBody.class)
+ .basic()
+ .publishExchange("")
+ .publishRoutingKey("unrouteable")
+ .publishMandatory(true)
+ .content("Test")
+ .publishMessage()
+ .consumeResponse()
+ .getLatestResponse(BasicNackBody.class);
+ assertThat(nackBody.getDeliveryTag(), is(equalTo(1L)));
+ }
+ }
+
+ @Test
+ @Ignore("QPID-7948 unrouteable message sent without mandatory true is neither ack'd nor nack'd")
+ public void publishUnrouteableMessage() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
+ .basic().publishExchange("")
+ .publishRoutingKey("unrouteable")
+ .publishMandatory(false)
+ .content("Test")
+ .publishMessage()
+ .consumeResponse(BasicAckBody.class);
+ }
+ }
+
+ @Test
+ @Ignore
+ @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+ description = "A transactional channel cannot be put into confirm mode and once a channel is in confirm "
+ + "mode, it cannot be made transactional.")
+ public void transactionChannelCannotEnableConfirms() throws Exception
+ {
+ try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ ChannelCloseBody closeBody = interaction.openAnonymousConnection()
+ .channel().open().consumeResponse(ChannelOpenOkBody.class)
+ .tx().select()
+ .consumeResponse(TxSelectOkBody.class)
+ .basic()
+ .confirmSelect()
+ .consumeResponse()
+ .getLatestResponse(ChannelCloseBody.class);
+ // Check behaviour of RabbitMQ.
+ assertThat(closeBody.getReplyCode(), is(equalTo(ErrorCodes.COMMAND_INVALID)));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org