You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:43 UTC

[qpid-broker-j] branch master updated (5381ed2 -> 1d8e033)

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git.


    from 5381ed2  QPID-8349: [Tests][AMQP 1.0] Remove assertions from Utils
     new af83b6d  QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin
     new 1d327d9  QPID-8349: [Tests][AMQP 1.0] Report decoding errors
     new 1e416eb  QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests
     new 605e727  QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests
     new 1d8e033  QPID-8349: [Tests][AMQP 1.0] Add ability to close connections without asserting peer responses

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../tests/protocol/v1_0/ExistingQueueAdmin.java    |  99 +++++------
 .../qpid/tests/protocol/v1_0/FrameDecoder.java     |  17 ++
 .../qpid/tests/protocol/v1_0/Interaction.java      |  32 +++-
 .../org/apache/qpid/tests/protocol/v1_0/Utils.java |  10 +-
 .../qpid/tests/protocol/v1_0/DecodeErrorTest.java  | 151 +++++++----------
 .../anonymousterminus/AnonymousTerminusTest.java   |  59 ++-----
 .../protocol/v1_0/messaging/MessageFormat.java     |  42 +----
 .../protocol/v1_0/messaging/MultiTransferTest.java |  56 ++++--
 .../protocol/v1_0/messaging/TransferTest.java      | 188 +++++++++++----------
 .../protocol/v1_0/transaction/DischargeTest.java   |   8 +-
 .../transaction/TransactionalTransferTest.java     |  34 ++--
 .../v1_0/transport/connection/OpenTest.java        |  23 +--
 .../protocol/v1_0/transport/link/AttachTest.java   |  33 ++--
 .../protocol/v1_0/transport/link/FlowTest.java     |  35 ++--
 .../v1_0/transport/link/LinkStealingTest.java      |   5 +-
 .../protocol/v1_0/transport/session/BeginTest.java |  17 +-
 16 files changed, 414 insertions(+), 395 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 05/05: QPID-8349: [Tests][AMQP 1.0] Add ability to close connections without asserting peer responses

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 1d8e03306dca63405744126991b84eadfcc92b6b
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:43:39 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Add ability to close connections without asserting peer responses
---
 .../org/apache/qpid/tests/protocol/v1_0/Interaction.java     | 12 ++++++++++++
 .../main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java | 10 ++--------
 .../org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java |  3 +++
 .../extensions/anonymousterminus/AnonymousTerminusTest.java  |  5 +++--
 .../tests/protocol/v1_0/messaging/MultiTransferTest.java     | 10 ++++++++++
 .../qpid/tests/protocol/v1_0/messaging/TransferTest.java     |  2 +-
 6 files changed, 31 insertions(+), 11 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 57d90d2..c0ccf92 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1282,4 +1282,16 @@ public class Interaction extends AbstractInteraction<Interaction>
         assertion.accept(latestResponse);
         return this;
     }
+
+    public void detachEndCloseUnconditionally() throws Exception
+    {
+        detachClose(true).detach().end().close().sync();
+    }
+
+    public Interaction closeUnconditionally() throws Exception
+    {
+        close().sync();
+        return this;
+    }
+
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 36bc8a7..cb55d7f 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -99,10 +99,7 @@ public class Utils
                        .dispositionLast(interaction.getLatestDeliveryId())
                        .dispositionState(new Accepted())
                        .disposition()
-                       .detachClose(true)
-                       .detach().consumeResponse(Detach.class)
-                       .end().consumeResponse(End.class)
-                       .doCloseConnection();
+                       .detachEndCloseUnconditionally();
             return interaction.getDecodedLatestDelivery();
         }
     }
@@ -178,10 +175,7 @@ public class Utils
                                .sync();
                     tag++;
                 }
-                interaction.detachClose(true)
-                    .detach().consumeResponse(Detach.class)
-                    .end().consumeResponse(End.class)
-                    .doCloseConnection();
+                interaction.detachEndCloseUnconditionally();
             }
         }
     }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index c0aaf45..d3d925c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -106,6 +106,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                            .transfer();
             }
 
+            interaction.closeUnconditionally();
         }
 
         final String validMessage = getTestName() + "_2";
@@ -132,6 +133,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                                                         .attachSource(source)
                                                         .attachRole(Role.SENDER)
                                                         .attach().consumeResponse()
+                                                        .closeUnconditionally()
                                                         .getLatestResponse();
 
             assertThat(latestResponse, is(notNullValue()));
@@ -165,6 +167,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                                                         .attachTarget(target)
                                                         .attachRole(Role.SENDER)
                                                         .attach().consumeResponse()
+                                                        .closeUnconditionally()
                                                         .getLatestResponse();
 
             assertThat(latestResponse, is(notNullValue()));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 10a700c..4748f10 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -106,7 +106,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.TRUE)
                        .transferDeliveryTag(_deliveryTag)
                        .transfer()
-                       .sync();
+                       .detachEndCloseUnconditionally();
 
             assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME),
                        is(equalTo(getTestName())));
@@ -267,7 +267,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(_deliveryTag)
                        .transferTransactionalStateFromCurrentTransaction()
                        .transferSettled(Boolean.TRUE)
-                       .transfer().txnDischarge(false);
+                       .transfer().txnDischarge(false)
+                       .detachEndCloseUnconditionally();
 
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 766afe5..da85c83 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -106,6 +106,9 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             {
                 payload.dispose();
             }
+
+            interaction.detachEndCloseUnconditionally();
+
             assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
             assertThat(disposition.getLast(), oneOf(null, deliveryId));
             assertThat(disposition.getSettled(), is(equalTo(true)));
@@ -165,6 +168,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             }
 
             Disposition disposition = interaction.consume(Disposition.class, Flow.class);
+            interaction.detachEndCloseUnconditionally();
+
             assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
             assertThat(disposition.getLast(), oneOf(null, deliveryId));
             assertThat(disposition.getSettled(), is(equalTo(true)));
@@ -210,6 +215,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             {
                 payload.dispose();
             }
+
+            interaction.detachEndCloseUnconditionally();
         }
         String secondMessage = getTestName() + "_2";
         Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
@@ -309,6 +316,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                 assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
             }
 
+            interaction.detachEndCloseUnconditionally();
+
             assertThat("Unexpected number of dispositions", dispositionMap.size(), equalTo(2));
             assertThat(dispositionMap.containsKey(deliverId1), is(true));
             assertThat(dispositionMap.containsKey(deliveryId2), is(true));
@@ -387,6 +396,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                 payload.dispose();
             }
 
+            interaction.closeUnconditionally();
         }
 
         final String controlMessage = getTestName() + "_Control";
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 6332ce4..7b42edb 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -927,7 +927,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            interaction.doCloseConnection();
+            interaction.closeUnconditionally();
 
         }
         assertTestQueueMessages(contents);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 02/05: QPID-8349: [Tests][AMQP 1.0] Report decoding errors

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 1d327d9b1ad316c0d78d6fe78a96a36108ef274c
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:10:54 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Report decoding errors
---
 .../apache/qpid/tests/protocol/v1_0/FrameDecoder.java   | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
index 39b16fa..0e86c01 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameDecoder.java
@@ -210,6 +210,7 @@ public class FrameDecoder implements InputDecoder
         public void handleError(final Error parsingError)
         {
             LOGGER.error("Unexpected error {}", parsingError);
+            _responseQueue.add(new FrameDecodeError(parsingError));
         }
 
         @Override
@@ -289,4 +290,20 @@ public class FrameDecoder implements InputDecoder
 
         }
     }
+
+    private static class FrameDecodeError implements Response<Error>
+    {
+        private final Error _error;
+
+        FrameDecodeError(final Error error)
+        {
+            _error = error;
+        }
+
+        @Override
+        public Error getBody()
+        {
+            return _error;
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 04/05: QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 605e727da226b503db95d3d2078e6196caf1aab6
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:39:35 2019 +0100

    QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests
---
 .../qpid/tests/protocol/v1_0/DecodeErrorTest.java  | 148 +++++++++------------
 .../protocol/v1_0/messaging/MessageFormat.java     |  42 ++----
 .../protocol/v1_0/messaging/MultiTransferTest.java |  29 +++-
 .../protocol/v1_0/messaging/TransferTest.java      | 120 ++++++++++-------
 .../v1_0/transport/connection/OpenTest.java        |  23 ++--
 .../protocol/v1_0/transport/link/AttachTest.java   |  33 +++--
 .../protocol/v1_0/transport/link/FlowTest.java     |  35 +++--
 .../v1_0/transport/link/LinkStealingTest.java      |   5 +-
 .../protocol/v1_0/transport/session/BeginTest.java |  17 ++-
 9 files changed, 238 insertions(+), 214 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index ce604c9..c0aaf45 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -25,8 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@ import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
@@ -52,13 +53,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -91,6 +90,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                        .begin()
                        .consumeResponse(Begin.class)
                        .attachRole(Role.SENDER)
+                       .attachSndSettleMode(SenderSettleMode.SETTLED)
                        .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .consumeResponse(Attach.class)
@@ -99,31 +99,24 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
                                              flow -> assumeThat(flow.getLinkCredit(),
                                                                 is(greaterThan(UnsignedInteger.ZERO))));
 
-            final List<QpidByteBuffer> payloads = buildInvalidMessage();
-            try
-            {
-                try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
-                {
-                    interaction.transferMessageFormat(UnsignedInteger.ZERO)
-                               .transferPayload(combinedPayload)
-                               .transfer();
-                }
-            }
-            finally
+            try(final QpidByteBuffer payload = buildInvalidMessage())
             {
-                payloads.forEach(QpidByteBuffer::dispose);
+                interaction.transferMessageFormat(UnsignedInteger.ZERO)
+                           .transferPayload(payload)
+                           .transfer();
             }
 
-            final Detach detachResponse = interaction.consumeResponse()
-                                                     .getLatestResponse(Detach.class);
-            assertThat(detachResponse.getError(), is(notNullValue()));
-            assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
         }
+
+        final String validMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, validMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(validMessage)));
     }
 
     @Test
     @SpecificationTest(section = "3.5.9",
-            description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+            description = "Node Properties [...] lifetime-policy [...] "
+                          + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
     public void nodePropertiesLifetimePolicy() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -143,24 +136,10 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
             assertThat(latestResponse, is(notNullValue()));
             final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else if (responseBody instanceof Detach)
-            {
-                error = ((Detach) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
+            assertThat(responseBody, is(notNullValue()));
+            assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
+
+            final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
 
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
@@ -169,7 +148,8 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "3.5.9",
-            description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+            description = "Node Properties [...] supported-dist-modes [...] "
+                          + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
     public void nodePropertiesSupportedDistributionModes() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -189,70 +169,66 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
 
             assertThat(latestResponse, is(notNullValue()));
             final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
+            assertThat(responseBody, is(notNullValue()));
+            assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
 
+            final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
         }
     }
 
-    private List<QpidByteBuffer> buildInvalidMessage()
+    private QpidByteBuffer buildInvalidMessage()
     {
         final List<QpidByteBuffer> payloads = new ArrayList<>();
-        final Header header = new Header();
-        header.setTtl(UnsignedInteger.valueOf(1000L));
-        final HeaderSection headerSection = header.createEncodingRetainingSection();
         try
         {
-            payloads.add(headerSection.getEncodedForm());
-        }
-        finally
-        {
-            headerSection.dispose();
-        }
+            final Header header = new Header();
+            header.setTtl(UnsignedInteger.valueOf(10000L));
+            final HeaderSection headerSection = header.createEncodingRetainingSection();
+            try
+            {
+                payloads.add(headerSection.getEncodedForm());
+            }
+            finally
+            {
+                headerSection.dispose();
+            }
 
-        final StringWriter stringWriter = new StringWriter("string in between annotation sections");
-        QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
-        stringWriter.writeToBuffer(encodedString);
-        encodedString.flip();
-        payloads.add(encodedString);
+            final StringWriter stringWriter = new StringWriter("string in between message sections");
+            final QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+            stringWriter.writeToBuffer(encodedString);
+            encodedString.flip();
+            payloads.add(encodedString);
 
-        final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
-        final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap);
-        final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
-        try
-        {
+            final Map<Symbol, Object> annotationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
+            final DeliveryAnnotations annotations = new DeliveryAnnotations(annotationMap);
+            final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
+            try
+            {
+                payloads.add(deliveryAnnotationsSection.getEncodedForm());
+            }
+            finally
+            {
+                deliveryAnnotationsSection.dispose();
+            }
 
-            payloads.add(deliveryAnnotationsSection.getEncodedForm());
-        }
-        finally
-        {
-            deliveryAnnotationsSection.dispose();
-        }
+            final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
+            try
+            {
+                payloads.add(payload.getEncodedForm());
+            }
+            finally
+            {
+                payload.dispose();
+            }
 
-        final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
-        try
-        {
-            payloads.add(payload.getEncodedForm());
+            return QpidByteBuffer.concatenate(payloads);
         }
         finally
         {
-            payload.dispose();
+            payloads.forEach(QpidByteBuffer::dispose);
         }
-        return payloads;
     }
 
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 6ac8e7f..5539291 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -20,10 +20,9 @@
 
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
-import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
 
 import java.net.InetSocketAddress;
 
@@ -34,18 +33,13 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
 
@@ -72,7 +66,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
         {
             QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
 
-            final Response<?> latestResponse = transport.newInteraction()
+            transport.newInteraction()
                                                         .negotiateProtocol().consumeResponse()
                                                         .open().consumeResponse(Open.class)
                                                         .begin().consumeResponse(Begin.class)
@@ -83,6 +77,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
                                                         .transferMore(true)
                                                         .transferMessageFormat(UnsignedInteger.ZERO)
                                                         .transferPayload(payloads[0])
+                                                        .transferSettled(true)
                                                         .transfer()
                                                         .consumeResponse(null, Flow.class, Disposition.class)
                                                         .transferDeliveryTag(null)
@@ -91,35 +86,16 @@ public class MessageFormat extends BrokerAdminUsingTestBase
                                                         .transferMessageFormat(UnsignedInteger.ONE)
                                                         .transferPayload(payloads[1])
                                                         .transfer()
-                                                        .consumeResponse(Detach.class, End.class, Close.class)
-                                                        .getLatestResponse();
+                                                        .sync();
 
             for (final QpidByteBuffer payload : payloads)
             {
                 payload.dispose();
             }
-            assertThat(latestResponse, is(notNullValue()));
-            final Object responseBody = latestResponse.getBody();
-            final Error error;
-            if (responseBody instanceof Detach)
-            {
-                error = ((Detach) responseBody).getError();
-            }
-            else if (responseBody instanceof End)
-            {
-                error = ((End) responseBody).getError();
-            }
-            else if (responseBody instanceof Close)
-            {
-                error = ((Close) responseBody).getError();
-            }
-            else
-            {
-                fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
-                error = null;
-            }
-
-            assertThat(error, is(notNullValue()));
         }
+
+        final String testMessage = getTestName() + "_2";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(testMessage)));
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 536d6ab..766afe5 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -210,7 +210,6 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             {
                 payload.dispose();
             }
-            interaction.consumeResponse(null, Flow.class);
         }
         String secondMessage = getTestName() + "_2";
         Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
@@ -320,7 +319,12 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "2.6.14",
-            description = "[...]messages transferred along a single link MUST NOT be interleaved")
+            description = "For messages that are too large to fit within the maximum frame size,"
+                          + " additional data MAY be transferred in additional transfer frames by setting"
+                          + " the more flag on all but the last transfer frame."
+                          + " When a message is split up into multiple transfer frames in this manner,"
+                          + " messages being transferred along different links MAY be interleaved."
+                          + " However, messages transferred along a single link MUST NOT be interleaved.")
     public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
     {
         String messageContent1 = getTestName() + "_1";
@@ -351,14 +355,28 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
                        .transferPayload(messagePayload1[0])
+                       .transferSettled(true)
                        .transfer()
-                       .sync()
 
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
+                       .transferSettled(true)
                        .transferPayload(messagePayload2[0])
                        .transfer()
+
+                       .transferDeliveryId(deliverId1)
+                       .transferDeliveryTag(deliveryTag1)
+                       .transferMore(false)
+                       .transferPayload(messagePayload1[1])
+                       .transfer()
+
+                       .transferDeliveryId(deliveryId2)
+                       .transferDeliveryTag(deliveryTag2)
+                       .transferMore(false)
+                       .transferPayload(messagePayload2[1])
+                       .transfer()
+
                        .sync();
             for (final QpidByteBuffer payload : messagePayload1)
             {
@@ -369,7 +387,10 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                 payload.dispose();
             }
 
-            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
         }
+
+        final String controlMessage = getTestName() + "_Control";
+        Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, controlMessage);
+        assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(controlMessage)));
     }
 }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 733ba94..6332ce4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -77,7 +76,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.protocol.ChannelClosedResponse;
 import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -106,7 +104,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
     @Test
     @SpecificationTest(section = "1.3.4",
             description = "mandatory [...] a non null value for the field is always encoded.")
-    public void emptyTransfer() throws Exception
+    public void transferHandleUnspecified() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
@@ -123,25 +121,43 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                            .consumeResponse()
                                            .getLatestResponse();
 
+            assertThat(response, is(notNullValue()));
             assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
 
-            if (response.getBody() instanceof Close)
-            {
-                final Close responseClose = (Close)response.getBody();
-                assertThat(responseClose.getError(), is(notNullValue()));
-                assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+        }
+    }
 
-                interact.close().sync();
-            }
-            else if (response.getBody() instanceof End)
-            {
-                final End responseEnd = (End)response.getBody();
-                assertThat(responseEnd.getError(), is(notNullValue()));
-                assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.")
+    public void transferDeliveryIdUnspecified() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            Interaction interact = transport.newInteraction();
+            Response<?> response = interact.negotiateProtocol().consumeResponse()
+                                           .open().consumeResponse(Open.class)
+                                           .begin().consumeResponse(Begin.class)
+                                           .attachRole(Role.SENDER)
+                                           .attach().consumeResponse(Attach.class)
+                                           .consumeResponse(Flow.class)
+                                           .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+                                           .transferDeliveryId(null)
+                                           .transfer()
+                                           .consumeResponse()
+                                           .getLatestResponse();
 
-                interact.end().doCloseConnection();
-            }
-            transport.assertNoMoreResponses();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
@@ -149,30 +165,40 @@ public class TransferTest extends BrokerAdminUsingTestBase
     @SpecificationTest(section = "2.7.5",
             description = "[delivery-tag] MUST be specified for the first transfer "
                           + "[...] and can only be omitted for continuation transfers.")
-    public void transferWithoutDeliveryTag() throws Exception
+    public void transferDeliveryTagUnspecified() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             Interaction interaction = transport.newInteraction()
-                                                 .negotiateProtocol().consumeResponse()
-                                                 .open().consumeResponse(Open.class)
-                                                 .begin().consumeResponse(Begin.class)
-                                                 .attachRole(Role.SENDER)
-                                                 .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                                 .attach().consumeResponse(Attach.class)
-                                                 .consumeResponse(Flow.class)
-                                                 .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
-                                                 .transferDeliveryId()
-                                                 .transferDeliveryTag(null)
-                                                 .transferPayloadData(getTestName())
-                                                 .transfer();
-            interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
+                                               .negotiateProtocol().consumeResponse()
+                                               .open().consumeResponse(Open.class)
+                                               .begin().consumeResponse(Begin.class)
+                                               .attachRole(Role.SENDER)
+                                               .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                               .attach().consumeResponse(Attach.class)
+                                               .consumeResponse(Flow.class)
+                                               .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+                                               .transferDeliveryId()
+                                               .transferDeliveryTag(null)
+                                               .transferPayloadData(getTestName())
+                                               .transfer()
+                                               .consumeResponse();
+
+            final Response<?> response = interaction.getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+            final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
     @Test
-    @SpecificationTest(section = "2.6.12",
-            description = "Transferring A Message.")
+    @SpecificationTest(section = "2.6.12 Transferring A Message",
+            description = "[...] the receiving application chooses to settle immediately upon processing the message"
+                          + " rather than waiting for the sender to settle first, that yields an at-least-once guarantee.")
     public void transferUnsettled() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -319,27 +345,19 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                      .consumeResponse()
                                      .getLatestResponse();
 
-            if (response.getBody() instanceof Detach)
-            {
-                final Detach detach = (Detach) response.getBody();
-                Error error = detach.getError();
-                assertThat(error, is(notNullValue()));
-                assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
-            }
-            else
-            {
-                if (response.getBody() instanceof Disposition)
-                {
-                    // clean up
-                    Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-                }
-                fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'");
-            }
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(Detach.class)));
+
+            final Detach detach = (Detach) response.getBody();
+            Error error = detach.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 
     @Test
-    @SpecificationTest(section = "", description = "Pipelined message send")
+    @SpecificationTest(section = "2.6.12 Transferring A Message", description = "Pipelined message send")
     public void presettledPipelined() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index 17fdab0..f0f46f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.both;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -74,7 +75,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
             Error error = responseClose.getError();
             if (error != null)
             {
-                assertThat(error.getCondition(), equalTo(AmqpError.DECODE_ERROR));
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
             }
         }
     }
@@ -89,16 +90,18 @@ public class OpenTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
             Interaction interaction = transport.newInteraction();
-            Open responseOpen = interaction
-                                         .negotiateProtocol().consumeResponse()
-                                         .openContainerId("testContainerId")
-                                         .open().consumeResponse()
-                                         .getLatestResponse(Open.class);
+            final Open responseOpen = interaction.negotiateProtocol().consumeResponse()
+                                                 .openContainerId("testContainerId")
+                                                 .open().consumeResponse()
+                                                 .getLatestResponse(Open.class);
+
             assertThat(responseOpen.getContainerId(), is(notNullValue()));
-            assertThat(responseOpen.getMaxFrameSize().longValue(),
-                       is(both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE.longValue()))));
-            assertThat(responseOpen.getChannelMax().intValue(),
-                       is(both(greaterThanOrEqualTo(0)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE.intValue()))));
+            assertThat(responseOpen.getMaxFrameSize(),
+                       is(anyOf(nullValue(),
+                                both(greaterThan(UnsignedInteger.ZERO)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE)))));
+            assertThat(responseOpen.getChannelMax(),
+                       is(anyOf(nullValue(),
+                                both(greaterThanOrEqualTo(UnsignedShort.ZERO)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE)))));
 
             interaction.doCloseConnection();
         }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index 75f5985..a170e7e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -19,10 +19,12 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
 import static org.hamcrest.Matchers.notNullValue;
@@ -32,6 +34,7 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -39,8 +42,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -58,17 +63,23 @@ public class AttachTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
-                                           .open().consumeResponse(Open.class)
-                                           .begin().consumeResponse(Begin.class)
-                                           .attachRole(null)
-                                           .attachHandle(null)
-                                           .attachName(null)
-                                           .attach().consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+            final Response<?> response = transport.newInteraction()
+                                                  .negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+                                                  .attachRole(null)
+                                                  .attachHandle(null)
+                                                  .attachName(null)
+                                                  .attach().consumeResponse()
+                                                  .getLatestResponse();
+            assertThat(response.getBody(), is(notNullValue()));
+            assertThat(response.getBody(), instanceOf(ErrorCarryingFrameBody.class));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(),
+                           anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 448de42..1f9968b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.link;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
@@ -32,13 +33,14 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -62,19 +64,24 @@ public class FlowTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
-                                           .open().consumeResponse(Open.class)
-                                           .begin().consumeResponse(Begin.class)
-                                           .flowIncomingWindow(null)
-                                           .flowNextIncomingId(null)
-                                           .flowOutgoingWindow(null)
-                                           .flowNextOutgoingId(null)
-                                           .flow()
-                                           .consumeResponse(Close.class)
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
+            final Response<?> response = transport.newInteraction()
+                                                  .negotiateProtocol().consumeResponse()
+                                                  .open().consumeResponse(Open.class)
+                                                  .begin().consumeResponse(Begin.class)
+                                                  .flowIncomingWindow(null)
+                                                  .flowNextIncomingId(null)
+                                                  .flowOutgoingWindow(null)
+                                                  .flowNextOutgoingId(null)
+                                                  .flow()
+                                                  .consumeResponse()
+                                                  .getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
index 260a3bc..fabcfa9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
 
 public class LinkStealingTest extends BrokerAdminUsingTestBase
 {
@@ -94,7 +95,9 @@ public class LinkStealingTest extends BrokerAdminUsingTestBase
 
 
     @Test
-    @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+    @SpecificationTest(section = "2.6.1. Naming a link",
+            description = "Qpid Broker J extended stolen behaviour on sessions")
+    @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
     public void subsequentAttachOnDifferentSessions() throws Exception
     {
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index f25d275..a1ea481 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.qpid.tests.protocol.v1_0.transport.session;
 
+import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.instanceOf;
@@ -31,13 +32,16 @@ import java.net.InetSocketAddress;
 
 import org.junit.Test;
 
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -54,16 +58,21 @@ public class BeginTest extends BrokerAdminUsingTestBase
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try(FrameTransport transport = new FrameTransport(addr).connect())
         {
-            Close responseClose = transport.newInteraction()
+            final Response<?> response =  transport.newInteraction()
                                            .negotiateProtocol().consumeResponse()
                                            .open().consumeResponse(Open.class)
                                            .beginNextOutgoingId(null)
                                            .beginIncomingWindow(null)
                                            .beginOutgoingWindow(null)
                                            .begin().consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assumeThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+                                           .getLatestResponse();
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+            final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+            if (error != null)
+            {
+                assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+            }
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 03/05: QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 1e416ebf2dc1a91ea3f1dc7332a66ee9de9e8316
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:28:06 2019 +0100

    QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests
---
 .../qpid/tests/protocol/v1_0/Interaction.java      | 20 +++++--
 .../anonymousterminus/AnonymousTerminusTest.java   | 54 +++++-------------
 .../protocol/v1_0/messaging/MultiTransferTest.java | 17 +++---
 .../protocol/v1_0/messaging/TransferTest.java      | 66 ++++++++++------------
 .../protocol/v1_0/transaction/DischargeTest.java   |  8 +--
 .../transaction/TransactionalTransferTest.java     | 34 ++++++-----
 6 files changed, 87 insertions(+), 112 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 8c7709d..57d90d2 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1054,19 +1054,25 @@ public class Interaction extends AbstractInteraction<Interaction>
     private DeliveryState handleCoordinatorResponse() throws Exception
     {
         final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class));
-
         if (_coordinatorCredits.decrementAndGet() == 0)
         {
             expected.add(Flow.class);
         }
 
-        final Map<Class<?>, ?> responses = consumeResponses(expected);
+        final Map<Class<?>, ?> responses = consumeResponses(expected, Collections.singleton(Flow.class));
 
         final Disposition disposition = (Disposition) responses.get(Disposition.class);
         if (expected.contains(Flow.class))
         {
             Flow flow = (Flow) responses.get(Flow.class);
-            _coordinatorCredits.set(flow.getLinkCredit().longValue());
+            if (flow.getHandle().equals(getCoordinatorHandle()))
+            {
+                final UnsignedInteger linkCredit = flow.getLinkCredit();
+                if (linkCredit != null)
+                {
+                    _coordinatorCredits.set(linkCredit.longValue());
+                }
+            }
         }
         if (!Boolean.TRUE.equals(disposition.getSettled()))
         {
@@ -1075,13 +1081,15 @@ public class Interaction extends AbstractInteraction<Interaction>
         return disposition.getState();
     }
 
-    private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes)
+    private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes, Set<Class<?>> ignore)
             throws Exception
     {
-        Map<Class<?>, Object> results = new HashMap<>();
+        final Map<Class<?>, Object> results = new HashMap<>();
+        final Set<Class<?>> expected = new HashSet<>(responseTypes);
+        expected.addAll(ignore);
         do
         {
-            Response<?> response = consumeResponse(responseTypes).getLatestResponse();
+            Response<?> response = consumeResponse(expected).getLatestResponse();
             if (response != null && response.getBody() instanceof FrameBody)
             {
                 Class<?> bodyClass = response.getBody().getClass();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 369abf7..10a700c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -38,7 +38,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.SequenceNumber;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -59,7 +58,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.util.StringUtil;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -72,7 +70,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
 {
     private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
     private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
-    private static final String TEST_MESSAGE_CONTENT = "test";
+
     private InetSocketAddress _brokerAddress;
     private Binary _deliveryTag;
 
@@ -110,8 +108,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME),
+                       is(equalTo(getTestName())));
         }
     }
 
@@ -145,7 +143,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
-            Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
+            final Detach detach = interaction.consume(Detach.class, Flow.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -183,10 +181,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination(getNonExistingDestinationName()))
                        .transferDeliveryTag(_deliveryTag)
-                       .transfer()
-                       .consumeResponse();
+                       .transfer();
 
-            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -233,7 +230,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
-            Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach detach = interaction.consume(Detach.class, Flow.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -275,7 +272,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
 
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(receivedMessage, is(equalTo(getTestName())));
         }
     }
 
@@ -305,7 +302,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -320,7 +317,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
 
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(receivedMessage, is(equalTo(getTestName())));
         }
     }
 
@@ -351,7 +348,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -400,7 +397,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Detach senderLinkDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach senderLinkDetach = interaction.consume(Detach.class, Flow.class);
             Error senderLinkDetachError = senderLinkDetach.getError();
             assertThat(senderLinkDetachError, is(notNullValue()));
             assertThat(senderLinkDetachError.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -537,7 +534,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.TRUE)
                        .transfer().txnSendDischarge(false);
 
-            Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach transactionCoordinatorDetach = interaction.consume(Detach.class, Flow.class);
             Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
             assertThat(transactionCoordinatorDetachError, is(notNullValue()));
             assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
@@ -549,29 +546,6 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
         return String.format("%sNonExisting%s", getTestName(), new StringUtil().randomAlphaNumericString(10));
     }
 
-    private Disposition getDispositionForDeliveryId(final Interaction interaction,
-                                                    final UnsignedInteger deliveryId) throws Exception
-    {
-        Disposition dischargeTransactionDisposition = null;
-
-        SequenceNumber id = new SequenceNumber(deliveryId.intValue());
-        do
-        {
-            Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
-            if (response.getBody() instanceof Disposition)
-            {
-                Disposition disposition = (Disposition) response.getBody();
-                UnsignedInteger first = disposition.getFirst();
-                UnsignedInteger last = disposition.getLast() == null ? disposition.getFirst() : disposition.getLast();
-                if (new SequenceNumber(first.intValue()).compareTo(id) >= 0 && new SequenceNumber(last.intValue()).compareTo(id) <=0)
-                {
-                    dischargeTransactionDisposition = disposition;
-                }
-            }
-        } while (dischargeTransactionDisposition == null);
-        return dischargeTransactionDisposition;
-    }
-
     private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception
     {
         final Interaction interaction = transport.newInteraction();
@@ -590,7 +564,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
         final Properties properties = new Properties();
         properties.setTo(destinationName);
         messageEncoder.setProperties(properties);
-        messageEncoder.addData(TEST_MESSAGE_CONTENT);
+        messageEncoder.addData(getTestName());
         return messageEncoder.getPayload();
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index a5f7e7c..536d6ab 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -48,9 +48,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -100,8 +100,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                                                  .transferMore(false)
                                                  .transferPayload(payloads[1])
                                                  .transfer()
-                                                 .consumeResponse()
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
 
             for (final QpidByteBuffer payload : payloads)
             {
@@ -158,15 +157,14 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(null)
                        .transferMore(false)
                        .transferPayload(payloads[3])
-                       .transfer()
-                       .consumeResponse();
-
-            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+                       .transfer().sync();
 
             for (final QpidByteBuffer payload : payloads)
             {
                 payload.dispose();
             }
+
+            Disposition disposition = interaction.consume(Disposition.class, Flow.class);
             assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
             assertThat(disposition.getLast(), oneOf(null, deliveryId));
             assertThat(disposition.getSettled(), is(equalTo(true)));
@@ -201,8 +199,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
+                       .transferSettled(true)
                        .transfer()
-                       .sync()
                        .transferPayload(null)
                        .transferMore(null)
                        .transferAborted(true)
@@ -304,8 +302,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
             for (int i = 0; i < 2; i++)
             {
-                Disposition disposition = interaction.consumeResponse(Disposition.class)
-                                                     .getLatestResponse(Disposition.class);
+                Disposition disposition = interaction.consume(Disposition.class, Flow.class);
                 dispositionMap.put(disposition.getFirst(), disposition);
 
                 assertThat(disposition.getLast(), oneOf(null, disposition.getFirst()));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index c89898d..733ba94 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -192,8 +192,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                        .transferHandle(linkHandle)
                                                        .transferPayloadData(getTestName())
                                                        .transfer()
-                                                       .consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+                                                       .consume(Disposition.class, Flow.class);
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
             assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -245,7 +244,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                      .transferDeliveryTag(deliveryTag)
                                      .transfer();
 
-            final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition1 = interaction.consume(Disposition.class, Flow.class);
             final UnsignedInteger first = disposition1.getFirst();
             final UnsignedInteger last = disposition1.getLast();
 
@@ -254,7 +253,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
             if (last == null || first.equals(last))
             {
-                final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+                final Disposition disposition2 = interaction.consume(Disposition.class, Flow.class);
                 assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
                 assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
                 assertThat(disposition2.getFirst(), is(not(equalTo(first))));
@@ -265,7 +264,8 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "2.7.5",
-            description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
+            description = "If first, this indicates that the receiver MUST settle the delivery once"
+                          + " it has arrived without waiting for the sender to settle first")
     public void transferReceiverSettleModeFirst() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -282,8 +282,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                        .transferPayloadData(getTestName())
                                                        .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                        .transfer()
-                                                       .consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+                                                       .consume(Disposition.class, Flow.class);
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
             assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -585,8 +584,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(new Accepted())
                                                  .disposition()
-                                                 .consumeResponse(Disposition.class)
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
             assertThat(disposition.getSettled(), is(true));
 
             interaction.dispositionSettled(true)
@@ -689,8 +687,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(null)
                                                  .disposition()
-                                                 .consumeResponse(Disposition.class)
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
             assertThat(disposition.getSettled(), is(true));
 
             interaction.consumeResponse(null, Flow.class);
@@ -836,8 +833,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .consumeResponse(Attach.class)
                        .assertLatestResponse(Attach.class, this::assumeReceiverSettlesSecond)
                        .consumeResponse(Flow.class)
-                       .assertLatestResponse(Flow.class,
-                                             flow -> assumeThat(flow.getLinkCredit().intValue(), is(greaterThan(1))))
+                       .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
                        .transferDeliveryId()
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData(content1)
@@ -898,12 +894,9 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .consumeResponse(Attach.class)
-                       .consumeResponse(Flow.class);
-
-            Flow flow = interaction.getLatestResponse(Flow.class);
-            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
-
-            interaction.transferDeliveryId(UnsignedInteger.ZERO)
+                       .consumeResponse(Flow.class)
+                       .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
+                       .transferDeliveryId(UnsignedInteger.ZERO)
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData(contents[0])
                        .transferSettled(true)
@@ -1202,24 +1195,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
     {
         do
         {
-            Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
-            if (response.getBody() instanceof Disposition)
-            {
-                Disposition disposition = (Disposition) response.getBody();
-                LongStream.rangeClosed(disposition.getFirst().longValue(),
-                                       disposition.getLast() == null
-                                               ? disposition.getFirst().longValue()
-                                               : disposition.getLast().longValue())
-                          .forEach(value -> {
-                              UnsignedInteger deliveryId = expectedDeliveryIds.first();
-                              assertThat(value, is(equalTo(deliveryId.longValue())));
-                              expectedDeliveryIds.remove(deliveryId);
-                          });
-            }
-            else if (response.getBody() instanceof Flow)
-            {
-                // ignore flows
-            }
+            Disposition disposition = interaction.consume(Disposition.class, Flow.class);
+            LongStream.rangeClosed(disposition.getFirst().longValue(),
+                                   disposition.getLast() == null
+                                           ? disposition.getFirst().longValue()
+                                           : disposition.getLast().longValue())
+                      .forEach(value -> {
+                          UnsignedInteger deliveryId = expectedDeliveryIds.first();
+                          assertThat(value, is(equalTo(deliveryId.longValue())));
+                          expectedDeliveryIds.remove(deliveryId);
+                      });
         }
         while (!expectedDeliveryIds.isEmpty());
     }
@@ -1243,9 +1228,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
     private void assumeSufficientCredits(final Flow flow)
     {
+        assumeThat(flow.getLinkCredit(), is(notNullValue()));
         assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)));
     }
 
+    private void assumeCreditsGreaterThanOne(final Flow flow)
+    {
+        assumeThat(flow.getLinkCredit(), is(notNullValue()));
+        assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ONE)));
+    }
+
     private void assumeReceiverSettlesSecond(final Attach attach)
     {
         assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND)));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 2036d1e..a22c8fc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -228,11 +228,11 @@ public class DischargeTest extends BrokerAdminUsingTestBase
                        .transferTransactionalStateFromCurrentTransaction()
                        .transferPayloadData(getTestName())
                        .transferHandle(UnsignedInteger.ONE)
-                       .transfer().consumeResponse(Disposition.class)
+                       .transfer().consume(Disposition.class, Flow.class);
 
-                       .detachHandle(UnsignedInteger.ONE)
-                       .detach().consumeResponse(Detach.class);
-            interaction.txnDischarge(false);
+            interaction.detachHandle(UnsignedInteger.ONE)
+                       .detach().consumeResponse(Detach.class)
+                       .txnDischarge(false);
 
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
         }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 6a334dc..0827209 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -37,6 +37,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -106,8 +107,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalStateFromCurrentTransaction()
                                                          .transfer()
-                                                         .consumeResponse(Disposition.class)
-                                                         .getLatestResponse(Disposition.class);
+                                                         .consume(Disposition.class, Flow.class);
 
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -155,8 +155,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalStateFromCurrentTransaction()
                                                          .transfer()
-                                                         .consumeResponse(Disposition.class)
-                                                         .getLatestResponse(Disposition.class);
+                                                         .consume(Disposition.class, Flow.class);
 
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -242,7 +241,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 
             final Interaction interaction = transport.newInteraction();
 
-            Response<?> response = interaction.negotiateProtocol().consumeResponse()
+            ErrorCarryingFrameBody response = interaction.negotiateProtocol().consumeResponse()
                                               .open().consumeResponse(Open.class)
                                               .begin().consumeResponse(Begin.class)
 
@@ -260,10 +259,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                               .transferPayloadData(getTestName())
                                               .transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
                                               .transfer()
-                                              .consumeResponse()
-                                              .getLatestResponse();
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
 
-            assertUnknownTransactionIdError(response);
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
     }
 
@@ -413,14 +413,17 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
             Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
 
-            Response<?> response = interaction.dispositionSettled(true)
+            ErrorCarryingFrameBody response = interaction.dispositionSettled(true)
                                               .dispositionRole(Role.RECEIVER)
                                               .dispositionTransactionalState(integerToBinary(Integer.MAX_VALUE),
                                                                              new Accepted())
                                               .dispositionFirst(deliveryId)
                                               .disposition()
-                                              .consumeResponse().getLatestResponse();
-            assertUnknownTransactionIdError(response);
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
+
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
         finally
         {
@@ -630,7 +633,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
-            Response<?> response = interaction.negotiateProtocol()
+            ErrorCarryingFrameBody response = interaction.negotiateProtocol()
                                               .consumeResponse()
                                               .open()
                                               .consumeResponse(Open.class)
@@ -653,10 +656,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                               .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
                                                                                        integerToBinary(Integer.MAX_VALUE)))
                                               .flow()
-                                              .consumeResponse()
-                                              .getLatestResponse();
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
 
-            assertUnknownTransactionIdError(response);
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
         finally
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-broker-j] 01/05: QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin

Posted by or...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit af83b6db0463d095f626592e15a9e4f5297281f7
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Thu Aug 22 13:01:42 2019 +0100

    QPID-8349: [Tests][AMQP 1.0] Fix ExistingQueueAdmin
---
 .../tests/protocol/v1_0/ExistingQueueAdmin.java    | 99 +++++++++-------------
 1 file changed, 41 insertions(+), 58 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
index 313c4ff..d789b1a 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExistingQueueAdmin.java
@@ -31,8 +31,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -48,6 +46,7 @@ public class ExistingQueueAdmin implements QueueAdmin
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(ExistingQueueAdmin.class);
     private static final String ADMIN_LINK_NAME = "existingQueueAdminLink";
+    private static final int DRAIN_CREDITS = 1000;
 
     @Override
     public void createQueue(final BrokerAdmin brokerAdmin, final String queueName)
@@ -58,14 +57,7 @@ public class ExistingQueueAdmin implements QueueAdmin
     @Override
     public void deleteQueue(final BrokerAdmin brokerAdmin, final String queueName)
     {
-        try
-        {
-            drainQueue(brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP), queueName);
-        }
-        catch (Exception e)
-        {
-            throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
-        }
+        drain(queueName, brokerAdmin.getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP));
     }
 
     @Override
@@ -96,6 +88,18 @@ public class ExistingQueueAdmin implements QueueAdmin
         return true;
     }
 
+    private void drain(final String queueName, final InetSocketAddress brokerAddress)
+    {
+        try
+        {
+            drainQueue(brokerAddress, queueName);
+        }
+        catch (Exception e)
+        {
+            throw new BrokerAdminException(String.format("Cannot drain queue '%s'", queueName), e);
+        }
+    }
+
     private void putMessageOnQueue(final InetSocketAddress brokerAddress,
                                    final String queueName,
                                    final String... message) throws Exception
@@ -133,10 +137,9 @@ public class ExistingQueueAdmin implements QueueAdmin
     {
         interaction.detachClose(true)
                    .detach()
-                   .consumeResponse(Detach.class)
                    .end()
-                   .consumeResponse(End.class)
-                   .doCloseConnection();
+                   .close()
+                   .sync();
     }
 
 
@@ -152,73 +155,53 @@ public class ExistingQueueAdmin implements QueueAdmin
                        .attachRole(Role.RECEIVER)
                        .attachSndSettleMode(SenderSettleMode.SETTLED)
                        .attachSourceAddress(queueName)
-                       .attach().consumeResponse();
-
+                       .attach().consumeResponse(Attach.class)
+                       .flowIncomingWindow(UnsignedInteger.MAX_VALUE)
+                       .flowNextIncomingId(interaction.getCachedResponse(Begin.class).getNextOutgoingId())
+                       .flowLinkCredit(UnsignedInteger.valueOf(DRAIN_CREDITS))
+                       .flowHandleFromLinkHandle()
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowDrain(Boolean.TRUE)
+                       .flow();
             boolean received;
-            final Begin begin = interaction.getCachedResponse(Begin.class);
-            int nextIncomingId = begin.getNextOutgoingId().intValue();
             do
             {
-                received = receive(interaction, queueName, nextIncomingId);
-                nextIncomingId++;
+                received = receive(interaction, queueName);
             }
             while (received);
             closeInteraction(interaction);
         }
     }
 
-    private boolean receive(final Interaction interaction, String queueName, int nextIncomingId) throws Exception
+    private boolean receive(final Interaction interaction, String queueName) throws Exception
     {
-        interaction.flowIncomingWindow(UnsignedInteger.MAX_VALUE)
-                   .flowNextIncomingId(UnsignedInteger.valueOf(nextIncomingId))
-                   .flowLinkCredit(UnsignedInteger.ONE)
-                   .flowDrain(Boolean.TRUE)
-                   .flowHandleFromLinkHandle()
-                   .flowOutgoingWindow(UnsignedInteger.ZERO)
-                   .flowNextOutgoingId(UnsignedInteger.ZERO)
-                   .flow();
-
+        boolean transferExpected;
         boolean messageReceived = false;
-        boolean flowReceived = false;
         do
         {
-            Response<?> latestResponse;
-            try
-            {
-                latestResponse = interaction.consumeResponse(Transfer.class, Flow.class).getLatestResponse();
-            }
-            catch (IllegalStateException e)
-            {
-                if (messageReceived)
-                {
-                    LOGGER.debug(
-                            "Message was received on draining queue '{}' but flow was not. Assuming successful receive...",
-                            queueName,
-                            e);
-                }
-                else
-                {
-                    LOGGER.warn(
-                            "Neither message no flow was received on draining queue '{}'.  Assuming no messages on the queue...",
-                            queueName,
-                            e);
-                }
-                return messageReceived;
-            }
-            if (latestResponse.getBody() instanceof Transfer)
+            final Response<?> latestResponse =
+                    interaction.consumeResponse(Transfer.class, Flow.class, null).getLatestResponse();
+            if (latestResponse != null && latestResponse.getBody() instanceof Transfer)
             {
                 Transfer responseTransfer = (Transfer) latestResponse.getBody();
-                if (!Boolean.TRUE.equals(responseTransfer.getMore()))
+                transferExpected = Boolean.TRUE.equals(responseTransfer.getMore());
+                if (!transferExpected)
                 {
                     messageReceived = true;
                 }
             }
-            else if (latestResponse.getBody() instanceof Flow)
+            else if (latestResponse != null && latestResponse.getBody() instanceof Flow)
+            {
+                transferExpected = false;
+            }
+            else
             {
-                flowReceived = true;
+                LOGGER.warn("Neither transfer no flow was received from '{}'. Assuming no messages left...", queueName);
+                transferExpected = false;
             }
         }
-        while (!flowReceived);
+        while (transferExpected);
         return messageReceived;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org