You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:47 UTC

[qpid-broker-j] 04/05: QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 605e727da226b503db95d3d2078e6196caf1aab6
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:39:35 2019 +0100

    QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests
---
 .../qpid/tests/protocol/v1_0/DecodeErrorTest.java  | 148 +++++++++------------
 .../protocol/v1_0/messaging/MessageFormat.java     |  42 ++----
 .../protocol/v1_0/messaging/MultiTransferTest.java |  29 +++-
 .../protocol/v1_0/messaging/TransferTest.java      | 120 ++++++++++-------
 .../v1_0/transport/connection/OpenTest.java        |  23 ++--
 .../protocol/v1_0/transport/link/AttachTest.java   |  33 +++--
 .../protocol/v1_0/transport/link/FlowTest.java     |  35 +++--
 .../v1_0/transport/link/LinkStealingTest.java      |   5 +-
 .../protocol/v1_0/transport/session/BeginTest.java |  17 ++-
 9 files changed, 238 insertions(+), 214 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index ce604c9..c0aaf45 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -25,8 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@ import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
@@ -52,13 +53,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -91,6 +90,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                        .begin()
                        .consumeResponse(Begin.class)
                        .attachRole(Role.SENDER)
+                       .attachSndSettleMode(SenderSettleMode.SETTLED)
                        .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .consumeResponse(Attach.class)
@@ -99,31 +99,24 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                                              flow -> assumeThat(flow.getLinkCredit(),
                                                                 is(greaterThan(UnsignedInteger.ZERO))));
 
-            final List<QpidByteBuffer> payloads = buildInvalidMessage();
-            try
-            {
-                try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
-                {
-                    interaction.transferMessageFormat(UnsignedInteger.ZERO)
-                               .transferPayload(combinedPayload)
-                               .transfer();
-                }
-            }
-            finally
+            try(final QpidByteBuffer payload = buildInvalidMessage())
             {
-                payloads.forEach(QpidByteBuffer::dispose);
+                interaction.transferMessageFormat(UnsignedInteger.ZERO)
+                           .transferPayload(payload)
+                           .transfer();
             }
 
-            final Detach detachResponse = interaction.consumeResponse()
-                                                     .getLatestResponse(Detach.class);
-            assertThat(detachResponse.getError(), is(notNullValue()));
-            assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
         }
+
+        final String validMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, validMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(validMessage)));
     }
 
     @Test
     @SpecificationTest(section = "3.5.9",
-            description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+            description = "Node Properties [...] lifetime-policy [...] "
+                          + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
     public void nodePropertiesLifetimePolicy() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -143,24 +136,10 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
             assertThat(latestResponse, is(notNullValue()));
             final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else if (responseBody instanceof Detach)
-            {
-                error = ((Detach) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
+            assertThat(responseBody, is(notNullValue()));
+            assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
+
+            final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
 
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
@@ -169,7 +148,8 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "3.5.9",
-            description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+            description = "Node Properties [...] supported-dist-modes [...] "
+                          + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
     public void nodePropertiesSupportedDistributionModes() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -189,70 +169,66 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
             assertThat(latestResponse, is(notNullValue()));
             final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
+            assertThat(responseBody, is(notNullValue()));
+            assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
 
+            final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
         }
     }
 
-    private List<QpidByteBuffer> buildInvalidMessage()
+    private QpidByteBuffer buildInvalidMessage()
     {
         final List<QpidByteBuffer> payloads = new ArrayList<>();
-        final Header header = new Header();
-        header.setTtl(UnsignedInteger.valueOf(1000L));
-        final HeaderSection headerSection = header.createEncodingRetainingSection();
         try
         {
-            payloads.add(headerSection.getEncodedForm());
-        }
-        finally
-        {
-            headerSection.dispose();
-        }
+            final Header header = new Header();
+            header.setTtl(UnsignedInteger.valueOf(10000L));
+            final HeaderSection headerSection = header.createEncodingRetainingSection();
+            try
+            {
+                payloads.add(headerSection.getEncodedForm());
+            }
+            finally
+            {
+                headerSection.dispose();
+            }
 
-        final StringWriter stringWriter = new StringWriter("string in between annotation sections");
-        QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
-        stringWriter.writeToBuffer(encodedString);
-        encodedString.flip();
-        payloads.add(encodedString);
+            final StringWriter stringWriter = new StringWriter("string in between message sections");
+            final QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+            stringWriter.writeToBuffer(encodedString);
+            encodedString.flip();
+            payloads.add(encodedString);
 
-        final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
-        final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap);
-        final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
-        try
-        {
+            final Map<Symbol, Object> annotationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
+            final DeliveryAnnotations annotations = new DeliveryAnnotations(annotationMap);
+            final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
+            try
+            {
+                payloads.add(deliveryAnnotationsSection.getEncodedForm());
+            }
+            finally
+            {
+                deliveryAnnotationsSection.dispose();
+            }
 
-            payloads.add(deliveryAnnotationsSection.getEncodedForm());
-        }
-        finally
-        {
-            deliveryAnnotationsSection.dispose();
-        }
+            final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
+            try
+            {
+                payloads.add(payload.getEncodedForm());
+            }
+            finally
+            {
+                payload.dispose();
+            }
 
-        final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
-        try
-        {
-            payloads.add(payload.getEncodedForm());
+            return QpidByteBuffer.concatenate(payloads);
         }
         finally
         {
-            payload.dispose();
+            payloads.forEach(QpidByteBuffer::dispose);
         }
-        return payloads;
     }
 
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 6ac8e7f..5539291 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -20,10 +20,9 @@
 
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
-import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
 
 import java.net.InetSocketAddress;
 
@@ -34,18 +33,13 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -72,7 +66,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
         {
             QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
 
-            final Response<?> latestResponse = transport.newInteraction()
+            transport.newInteraction()
                                                         .negotiateProtocol().consumeResponse()
                                                         .open().consumeResponse(Open.class)
                                                         .begin().consumeResponse(Begin.class)
@@ -83,6 +77,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
                                                         .transferMore(true)
                                                         .transferMessageFormat(UnsignedInteger.ZERO)
                                                         .transferPayload(payloads[0])
+                                                        .transferSettled(true)
                                                         .transfer()
                                                         .consumeResponse(null, Flow.class, Disposition.class)
                                                         .transferDeliveryTag(null)
@@ -91,35 +86,16 @@ public class MessageFormat extends BrokerAdminUsingTestBase
                                                         .transferMessageFormat(UnsignedInteger.ONE)
                                                         .transferPayload(payloads[1])
                                                         .transfer()
-                                                        .consumeResponse(Detach.class, End.class, Close.class)
-                                                        .getLatestResponse();
+                                                        .sync();
 
             for (final QpidByteBuffer payload : payloads)
             {
                 payload.dispose();
             }
-            assertThat(latestResponse, is(notNullValue()));
-            final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof Detach)
-            {
-                error = ((Detach) responseBody).getError();
-            }
-            else if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
-
-            assertThat(error, is(notNullValue()));
         }
+
+        final String testMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(testMessage)));
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 536d6ab..766afe5 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -210,7 +210,6 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             {
                 payload.dispose();
             }
-            interaction.consumeResponse(null, Flow.class);
         }
         String secondMessage = getTestName() + "_2";
         Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
@@ -320,7 +319,12 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "2.6.14",
-            description = "[...]messages transferred along a single link MUST NOT be interleaved")
+            description = "For messages that are too large to fit within the maximum frame size,"
+                          + " additional data MAY be transferred in additional transfer frames by setting"
+                          + " the more flag on all but the last transfer frame."
+                          + " When a message is split up into multiple transfer frames in this manner,"
+                          + " messages being transferred along different links MAY be interleaved."
+                          + " However, messages transferred along a single link MUST NOT be interleaved.")
     public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
     {
         String messageContent1 = getTestName() + "_1";
@@ -351,14 +355,28 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
                        .transferPayload(messagePayload1[0])
+                       .transferSettled(true)
                        .transfer()
-                       .sync()
 
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
+                       .transferSettled(true)
                        .transferPayload(messagePayload2[0])
                        .transfer()
+
+                       .transferDeliveryId(deliverId1)
+                       .transferDeliveryTag(deliveryTag1)
+                       .transferMore(false)
+                       .transferPayload(messagePayload1[1])
+                       .transfer()
+
+                       .transferDeliveryId(deliveryId2)
+                       .transferDeliveryTag(deliveryTag2)
+                       .transferMore(false)
+                       .transferPayload(messagePayload2[1])
+                       .transfer()
+
                        .sync();
             for (final QpidByteBuffer payload : messagePayload1)
             {
@@ -369,7 +387,10 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                 payload.dispose();
             }
 
-            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
         }
+
+        final String controlMessage = getTestName() + "_Control";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, controlMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(controlMessage)));
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 733ba94..6332ce4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -77,7 +76,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -106,7 +104,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
     @Test
     @SpecificationTest(section = "1.3.4",
             description = "mandatory [...] a non null value for the field is always encoded.")
-    public void emptyTransfer() throws Exception
+    public void transferHandleUnspecified() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
@@ -123,25 +121,43 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                            .consumeResponse()
                                            .getLatestResponse();
 
+            assertThat(response, is(notNullValue()));
             assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
 
-            if (response.getBody() instanceof Close)
-            {
-                final Close responseClose = (Close)response.getBody();
-                assertThat(responseClose.getError(), is(notNullValue()));
-                assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+        }
+    }
 
-                interact.close().sync();
-            }
-            else if (response.getBody() instanceof End)
-            {
-                final End responseEnd = (End)response.getBody();
-                assertThat(responseEnd.getError(), is(notNullValue()));
-                assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.")
+    public void transferDeliveryIdUnspecified() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Interaction interact = transport.newInteraction();
+            Response<?> response = interact.negotiateProtocol().consumeResponse()
+                                           .open().consumeResponse(Open.class)
+                                           .begin().consumeResponse(Begin.class)
+                                           .attachRole(Role.SENDER)
+                                           .attach().consumeResponse(Attach.class)
+                                           .consumeResponse(Flow.class)
+                                           .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+                                           .transferDeliveryId(null)
+                                           .transfer()
+                                           .consumeResponse()
+                                           .getLatestResponse();
 
-                interact.end().doCloseConnection();
-            }
-            transport.assertNoMoreResponses();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
@@ -149,30 +165,40 @@ public class TransferTest extends BrokerAdminUsingTestBase
     @SpecificationTest(section = "2.7.5",
             description = "[delivery-tag] MUST be specified for the first transfer "
                           + "[...] and can only be omitted for continuation transfers.")
-    public void transferWithoutDeliveryTag() throws Exception
+    public void transferDeliveryTagUnspecified() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             Interaction interaction = transport.newInteraction()
-                                                 .negotiateProtocol().consumeResponse()
-                                                 .open().consumeResponse(Open.class)
-                                                 .begin().consumeResponse(Begin.class)
-                                                 .attachRole(Role.SENDER)
-                                                 .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                 .attach().consumeResponse(Attach.class)
-                                                 .consumeResponse(Flow.class)
-                                                 .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
-                                                 .transferDeliveryId()
-                                                 .transferDeliveryTag(null)
-                                                 .transferPayloadData(getTestName())
-                                                 .transfer();
-            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.SENDER)
+                                               .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class)
+                                               .consumeResponse(Flow.class)
+                                               .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+                                               .transferDeliveryId()
+                                               .transferDeliveryTag(null)
+                                               .transferPayloadData(getTestName())
+                                               .transfer()
+                                               .consumeResponse();
+
+            final Response<?> response = interaction.getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
     @Test
-    @SpecificationTest(section = "2.6.12",
-            description = "Transferring A Message.")
+    @SpecificationTest(section = "2.6.12 Transferring A Message",
+            description = "[...] the receiving application chooses to settle immediately upon processing the message"
+                          + " rather than waiting for the sender to settle first, that yields an at-least-once guarantee.")
     public void transferUnsettled() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -319,27 +345,19 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                      .consumeResponse()
                                      .getLatestResponse();
 
-            if (response.getBody() instanceof Detach)
-            {
-                final Detach detach = (Detach) response.getBody();
-                Error error = detach.getError();
-                assertThat(error, is(notNullValue()));
-                assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
-            }
-            else
-            {
-                if (response.getBody() instanceof Disposition)
-                {
-                    // clean up
-                    Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-                }
-                fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'");
-            }
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(Detach.class)));
+
+            final Detach detach = (Detach) response.getBody();
+            Error error = detach.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
     @Test
-    @SpecificationTest(section = "", description = "Pipelined message send")
+    @SpecificationTest(section = "2.6.12 Transferring A Message", description = "Pipelined message send")
     public void presettledPipelined() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index 17fdab0..f0f46f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -74,7 +75,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
             Error error = responseClose.getError();
             if (error != null)
             {
-                assertThat(error.getCondition(), equalTo(AmqpError.DECODE_ERROR));
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
             }
         }
     }
@@ -89,16 +90,18 @@ public class OpenTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             Interaction interaction = transport.newInteraction();
-            Open responseOpen = interaction
-                                         .negotiateProtocol().consumeResponse()
-                                         .openContainerId("testContainerId")
-                                         .open().consumeResponse()
-                                         .getLatestResponse(Open.class);
+            final Open responseOpen = interaction.negotiateProtocol().consumeResponse()
+                                                 .openContainerId("testContainerId")
+                                                 .open().consumeResponse()
+                                                 .getLatestResponse(Open.class);
+
             assertThat(responseOpen.getContainerId(), is(notNullValue()));
-            assertThat(responseOpen.getMaxFrameSize().longValue(),
-                       is(both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE.longValue()))));
-            assertThat(responseOpen.getChannelMax().intValue(),
-                       is(both(greaterThanOrEqualTo(0)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE.intValue()))));
+            assertThat(responseOpen.getMaxFrameSize(),
+                       is(anyOf(nullValue(),
+                                both(greaterThan(UnsignedInteger.ZERO)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE)))));
+            assertThat(responseOpen.getChannelMax(),
+                       is(anyOf(nullValue(),
+                                both(greaterThanOrEqualTo(UnsignedShort.ZERO)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE)))));
 
             interaction.doCloseConnection();
         }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index 75f5985..a170e7e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -19,10 +19,12 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
@@ -32,6 +34,7 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -39,8 +42,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -58,17 +63,23 @@ public class AttachTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
-                                           .open().consumeResponse(Open.class)
-                                           .begin().consumeResponse(Begin.class)
-                                           .attachRole(null)
-                                           .attachHandle(null)
-                                           .attachName(null)
-                                           .attach().consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+            final Response<?> response = transport.newInteraction()
+                                                  .negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+                                                  .attachRole(null)
+                                                  .attachHandle(null)
+                                                  .attachName(null)
+                                                  .attach().consumeResponse()
+                                                  .getLatestResponse();
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), instanceOf(ErrorCarryingFrameBody.class));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(),
+                           anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 448de42..1f9968b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
@@ -32,13 +33,14 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -62,19 +64,24 @@ public class FlowTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
-                                           .open().consumeResponse(Open.class)
-                                           .begin().consumeResponse(Begin.class)
-                                           .flowIncomingWindow(null)
-                                           .flowNextIncomingId(null)
-                                           .flowOutgoingWindow(null)
-                                           .flowNextOutgoingId(null)
-                                           .flow()
-                                           .consumeResponse(Close.class)
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
+            final Response<?> response = transport.newInteraction()
+                                                  .negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+                                                  .flowIncomingWindow(null)
+                                                  .flowNextIncomingId(null)
+                                                  .flowOutgoingWindow(null)
+                                                  .flowNextOutgoingId(null)
+                                                  .flow()
+                                                  .consumeResponse()
+                                                  .getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
index 260a3bc..fabcfa9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class LinkStealingTest extends BrokerAdminUsingTestBase
 {
@@ -94,7 +95,9 @@ public class LinkStealingTest extends BrokerAdminUsingTestBase
 
 
     @Test
-    @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+    @SpecificationTest(section = "2.6.1. Naming a link",
+            description = "Qpid Broker J extended stolen behaviour on sessions")
+    @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
     public void subsequentAttachOnDifferentSessions() throws Exception
     {
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index f25d275..a1ea481 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.session;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -31,13 +32,16 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -54,16 +58,21 @@ public class BeginTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try(FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
+            final Response<?> response =  transport.newInteraction()
                                            .negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .beginNextOutgoingId(null)
                                            .beginIncomingWindow(null)
                                            .beginOutgoingWindow(null)
                                            .begin().consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assumeThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+                                           .getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org