You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2017/12/10 22:51:50 UTC

qpid-broker-j git commit: QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add publisher confirms tests (RabbitMQ extension to AMQP 0-91

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master f0c04045a -> 79aecbce0


QPID-8038: [Broker-J] [AMQP 0-8..0-91] Add publisher confirms tests (RabbitMQ extension to AMQP 0-91


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/79aecbce
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/79aecbce
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/79aecbce

Branch: refs/heads/master
Commit: 79aecbce08c208cbaa61ba61fc0fe4abb61cd72b
Parents: f0c0404
Author: Keith Wall <ke...@gmail.com>
Authored: Sun Dec 10 22:50:38 2017 +0000
Committer: Keith Wall <ke...@gmail.com>
Committed: Sun Dec 10 22:50:38 2017 +0000

----------------------------------------------------------------------
 .../tests/protocol/v0_8/BasicInteraction.java   |   8 +
 .../tests/protocol/v0_8/FrameTransport.java     |   2 +-
 .../confirms/PublisherConfirmsTest.java         | 154 +++++++++++++++++++
 3 files changed, 163 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
index 7f91d8d..35ff42a 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/BasicInteraction.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v0_8.transport.BasicGetBody;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicPublishBody;
 import org.apache.qpid.server.protocol.v0_8.transport.BasicQosBody;
 import org.apache.qpid.server.protocol.v0_8.transport.CompositeAMQDataBlock;
+import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
 import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
 
@@ -72,6 +73,8 @@ public class BasicInteraction
     private String _getQueueName;
     private boolean _getNoAck;
 
+    private boolean _confirmSelectNoWait;
+
     public BasicInteraction(final Interaction interaction)
     {
         _interaction = interaction;
@@ -270,4 +273,9 @@ public class BasicInteraction
         _getNoAck = noAck;
         return this;
     }
+
+    public Interaction confirmSelect() throws Exception
+    {
+        return _interaction.sendPerformative(new ConfirmSelectBody(_confirmSelectNoWait));
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
index 0b5c4e4..432eab8 100644
--- a/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
+++ b/systests/protocol-tests-amqp-0-8/src/main/java/org/apache/qpid/tests/protocol/v0_8/FrameTransport.java
@@ -34,7 +34,7 @@ public class FrameTransport extends AbstractFrameTransport<Interaction>
     private final byte[] _protocolHeader;
     private ProtocolVersion _protocolVersion;
 
-    FrameTransport(final InetSocketAddress brokerAddress)
+    public FrameTransport(final InetSocketAddress brokerAddress)
     {
         this(brokerAddress, Protocol.AMQP_0_9_1);
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/79aecbce/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
new file mode 100644
index 0000000..be2d444
--- /dev/null
+++ b/systests/protocol-tests-amqp-0-8/src/test/java/org/apache/qpid/tests/protocol/v0_8/extension/confirms/PublisherConfirmsTest.java
@@ -0,0 +1,154 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v0_8.extension.confirms;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.net.InetSocketAddress;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.ErrorCodes;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicAckBody;
+import org.apache.qpid.server.protocol.v0_8.transport.BasicNackBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelCloseBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ChannelOpenOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectOkBody;
+import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
+import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v0_8.FrameTransport;
+import org.apache.qpid.tests.protocol.v0_8.Interaction;
+import org.apache.qpid.tests.utils.BrokerAdmin;
+import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+
+/**
+ * The specification for publisher confirms is:  https://www.rabbitmq.com/confirms.html
+ */
+public class PublisherConfirmsTest extends BrokerAdminUsingTestBase
+{
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+    }
+
+    @Test
+    @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+            description = "Once a channel is in confirm mode, both the broker and the client count messages "
+                          + "(counting starts at 1 on the first confirm.select)The broker then confirms messages as "
+                          + "it handles them by sending a basic.ack on the same channel. The delivery-tag field "
+                          + "contains the sequence number of the confirmed message." )
+    public void publishMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            BasicAckBody ackBody = interaction.openAnonymousConnection()
+                                              .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                              .basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
+                                              .basic().publishExchange("")
+                                              .publishRoutingKey(BrokerAdmin.TEST_QUEUE_NAME)
+                                              .content("Test")
+                                              .publishMessage()
+                                              .consumeResponse().getLatestResponse(BasicAckBody.class);
+
+            assertThat(ackBody.getDeliveryTag(), is(equalTo(1L)));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+            description = "[...] when the broker is unable to handle messages successfully, instead of a basic.ack,"
+                          + "the broker will send a basic.nack.")
+    public void publishUnrouteableMandatoryMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            BasicNackBody nackBody = interaction.openAnonymousConnection()
+                                                      .channel()
+                                                      .open()
+                                                      .consumeResponse(ChannelOpenOkBody.class)
+                                                      .basic()
+                                                      .confirmSelect()
+                                                      .consumeResponse(ConfirmSelectOkBody.class)
+                                                      .basic()
+                                                      .publishExchange("")
+                                                      .publishRoutingKey("unrouteable")
+                                                      .publishMandatory(true)
+                                                      .content("Test")
+                                                      .publishMessage()
+                                                      .consumeResponse()
+                                                      .getLatestResponse(BasicNackBody.class);
+            assertThat(nackBody.getDeliveryTag(), is(equalTo(1L)));
+        }
+    }
+
+    @Test
+    @Ignore("QPID-7948 unrouteable message sent without mandatory true is neither ack'd nor nack'd")
+    public void publishUnrouteableMessage() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.openAnonymousConnection()
+                       .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                       .basic().confirmSelect().consumeResponse(ConfirmSelectOkBody.class)
+                       .basic().publishExchange("")
+                       .publishRoutingKey("unrouteable")
+                       .publishMandatory(false)
+                       .content("Test")
+                       .publishMessage()
+                       .consumeResponse(BasicAckBody.class);
+        }
+    }
+
+    @Test
+    @Ignore
+    @SpecificationTest(section = "https://www.rabbitmq.com/confirms.html",
+            description = "A transactional channel cannot be put into confirm mode and once a channel is in confirm "
+                          + "mode, it cannot be made transactional.")
+    public void transactionChannelCannotEnableConfirms() throws Exception
+    {
+        try(FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            ChannelCloseBody closeBody = interaction.openAnonymousConnection()
+                                                    .channel().open().consumeResponse(ChannelOpenOkBody.class)
+                                                    .tx().select()
+                                                    .consumeResponse(TxSelectOkBody.class)
+                                                    .basic()
+                                                    .confirmSelect()
+                                                    .consumeResponse()
+                                                    .getLatestResponse(ChannelCloseBody.class);
+            // Check behaviour of RabbitMQ.
+            assertThat(closeBody.getReplyCode(), is(equalTo(ErrorCodes.COMMAND_INVALID)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org