You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:46 UTC

[qpid-broker-j] 03/05: QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests

This is an automated email from the ASF dual-hosted git repository.

orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git

commit 1e416ebf2dc1a91ea3f1dc7332a66ee9de9e8316
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:28:06 2019 +0100

    QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests
---
 .../qpid/tests/protocol/v1_0/Interaction.java      | 20 +++++--
 .../anonymousterminus/AnonymousTerminusTest.java   | 54 +++++-------------
 .../protocol/v1_0/messaging/MultiTransferTest.java | 17 +++---
 .../protocol/v1_0/messaging/TransferTest.java      | 66 ++++++++++------------
 .../protocol/v1_0/transaction/DischargeTest.java   |  8 +--
 .../transaction/TransactionalTransferTest.java     | 34 ++++++-----
 6 files changed, 87 insertions(+), 112 deletions(-)

diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 8c7709d..57d90d2 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1054,19 +1054,25 @@ public class Interaction extends AbstractInteraction<Interaction>
     private DeliveryState handleCoordinatorResponse() throws Exception
     {
         final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class));
-
         if (_coordinatorCredits.decrementAndGet() == 0)
         {
             expected.add(Flow.class);
         }
 
-        final Map<Class<?>, ?> responses = consumeResponses(expected);
+        final Map<Class<?>, ?> responses = consumeResponses(expected, Collections.singleton(Flow.class));
 
         final Disposition disposition = (Disposition) responses.get(Disposition.class);
         if (expected.contains(Flow.class))
         {
             Flow flow = (Flow) responses.get(Flow.class);
-            _coordinatorCredits.set(flow.getLinkCredit().longValue());
+            if (flow.getHandle().equals(getCoordinatorHandle()))
+            {
+                final UnsignedInteger linkCredit = flow.getLinkCredit();
+                if (linkCredit != null)
+                {
+                    _coordinatorCredits.set(linkCredit.longValue());
+                }
+            }
         }
         if (!Boolean.TRUE.equals(disposition.getSettled()))
         {
@@ -1075,13 +1081,15 @@ public class Interaction extends AbstractInteraction<Interaction>
         return disposition.getState();
     }
 
-    private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes)
+    private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes, Set<Class<?>> ignore)
             throws Exception
     {
-        Map<Class<?>, Object> results = new HashMap<>();
+        final Map<Class<?>, Object> results = new HashMap<>();
+        final Set<Class<?>> expected = new HashSet<>(responseTypes);
+        expected.addAll(ignore);
         do
         {
-            Response<?> response = consumeResponse(responseTypes).getLatestResponse();
+            Response<?> response = consumeResponse(expected).getLatestResponse();
             if (response != null && response.getBody() instanceof FrameBody)
             {
                 Class<?> bodyClass = response.getBody().getClass();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 369abf7..10a700c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -38,7 +38,6 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.SequenceNumber;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -59,7 +58,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.util.StringUtil;
-import org.apache.qpid.tests.protocol.Response;
 import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -72,7 +70,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
 {
     private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
     private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
-    private static final String TEST_MESSAGE_CONTENT = "test";
+
     private InetSocketAddress _brokerAddress;
     private Binary _deliveryTag;
 
@@ -110,8 +108,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transfer()
                        .sync();
 
-            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME),
+                       is(equalTo(getTestName())));
         }
     }
 
@@ -145,7 +143,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
-            Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
+            final Detach detach = interaction.consume(Detach.class, Flow.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -183,10 +181,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryId()
                        .transferPayload(generateMessagePayloadToDestination(getNonExistingDestinationName()))
                        .transferDeliveryTag(_deliveryTag)
-                       .transfer()
-                       .consumeResponse();
+                       .transfer();
 
-            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -233,7 +230,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(_deliveryTag)
                        .transfer();
 
-            Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach detach = interaction.consume(Detach.class, Flow.class);
             Error error = detach.getError();
             assertThat(error, is(notNullValue()));
             assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -275,7 +272,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
 
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(receivedMessage, is(equalTo(getTestName())));
         }
     }
 
@@ -305,7 +302,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -320,7 +317,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
 
             Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
-            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+            assertThat(receivedMessage, is(equalTo(getTestName())));
         }
     }
 
@@ -351,7 +348,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
 
             assertThat(disposition.getSettled(), is(true));
 
@@ -400,7 +397,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.FALSE)
                        .transfer();
 
-            Detach senderLinkDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach senderLinkDetach = interaction.consume(Detach.class, Flow.class);
             Error senderLinkDetachError = senderLinkDetach.getError();
             assertThat(senderLinkDetachError, is(notNullValue()));
             assertThat(senderLinkDetachError.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -537,7 +534,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
                        .transferSettled(Boolean.TRUE)
                        .transfer().txnSendDischarge(false);
 
-            Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+            final Detach transactionCoordinatorDetach = interaction.consume(Detach.class, Flow.class);
             Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
             assertThat(transactionCoordinatorDetachError, is(notNullValue()));
             assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
@@ -549,29 +546,6 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
         return String.format("%sNonExisting%s", getTestName(), new StringUtil().randomAlphaNumericString(10));
     }
 
-    private Disposition getDispositionForDeliveryId(final Interaction interaction,
-                                                    final UnsignedInteger deliveryId) throws Exception
-    {
-        Disposition dischargeTransactionDisposition = null;
-
-        SequenceNumber id = new SequenceNumber(deliveryId.intValue());
-        do
-        {
-            Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
-            if (response.getBody() instanceof Disposition)
-            {
-                Disposition disposition = (Disposition) response.getBody();
-                UnsignedInteger first = disposition.getFirst();
-                UnsignedInteger last = disposition.getLast() == null ? disposition.getFirst() : disposition.getLast();
-                if (new SequenceNumber(first.intValue()).compareTo(id) >= 0 && new SequenceNumber(last.intValue()).compareTo(id) <=0)
-                {
-                    dischargeTransactionDisposition = disposition;
-                }
-            }
-        } while (dischargeTransactionDisposition == null);
-        return dischargeTransactionDisposition;
-    }
-
     private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception
     {
         final Interaction interaction = transport.newInteraction();
@@ -590,7 +564,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
         final Properties properties = new Properties();
         properties.setTo(destinationName);
         messageEncoder.setProperties(properties);
-        messageEncoder.addData(TEST_MESSAGE_CONTENT);
+        messageEncoder.addData(getTestName());
         return messageEncoder.getPayload();
     }
 
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index a5f7e7c..536d6ab 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -48,9 +48,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.SpecificationTest;
 import org.apache.qpid.tests.protocol.v1_0.Utils;
 import org.apache.qpid.tests.utils.BrokerAdmin;
 import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -100,8 +100,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                                                  .transferMore(false)
                                                  .transferPayload(payloads[1])
                                                  .transfer()
-                                                 .consumeResponse()
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
 
             for (final QpidByteBuffer payload : payloads)
             {
@@ -158,15 +157,14 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryTag(null)
                        .transferMore(false)
                        .transferPayload(payloads[3])
-                       .transfer()
-                       .consumeResponse();
-
-            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+                       .transfer().sync();
 
             for (final QpidByteBuffer payload : payloads)
             {
                 payload.dispose();
             }
+
+            Disposition disposition = interaction.consume(Disposition.class, Flow.class);
             assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
             assertThat(disposition.getLast(), oneOf(null, deliveryId));
             assertThat(disposition.getSettled(), is(equalTo(true)));
@@ -201,8 +199,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
+                       .transferSettled(true)
                        .transfer()
-                       .sync()
                        .transferPayload(null)
                        .transferMore(null)
                        .transferAborted(true)
@@ -304,8 +302,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
             Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
             for (int i = 0; i < 2; i++)
             {
-                Disposition disposition = interaction.consumeResponse(Disposition.class)
-                                                     .getLatestResponse(Disposition.class);
+                Disposition disposition = interaction.consume(Disposition.class, Flow.class);
                 dispositionMap.put(disposition.getFirst(), disposition);
 
                 assertThat(disposition.getLast(), oneOf(null, disposition.getFirst()));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index c89898d..733ba94 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -192,8 +192,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                        .transferHandle(linkHandle)
                                                        .transferPayloadData(getTestName())
                                                        .transfer()
-                                                       .consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+                                                       .consume(Disposition.class, Flow.class);
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
             assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -245,7 +244,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                      .transferDeliveryTag(deliveryTag)
                                      .transfer();
 
-            final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+            final Disposition disposition1 = interaction.consume(Disposition.class, Flow.class);
             final UnsignedInteger first = disposition1.getFirst();
             final UnsignedInteger last = disposition1.getLast();
 
@@ -254,7 +253,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
             if (last == null || first.equals(last))
             {
-                final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+                final Disposition disposition2 = interaction.consume(Disposition.class, Flow.class);
                 assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
                 assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
                 assertThat(disposition2.getFirst(), is(not(equalTo(first))));
@@ -265,7 +264,8 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
     @Test
     @SpecificationTest(section = "2.7.5",
-            description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
+            description = "If first, this indicates that the receiver MUST settle the delivery once"
+                          + " it has arrived without waiting for the sender to settle first")
     public void transferReceiverSettleModeFirst() throws Exception
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -282,8 +282,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                        .transferPayloadData(getTestName())
                                                        .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                        .transfer()
-                                                       .consumeResponse()
-                                                       .getLatestResponse(Disposition.class);
+                                                       .consume(Disposition.class, Flow.class);
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
             assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -585,8 +584,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(new Accepted())
                                                  .disposition()
-                                                 .consumeResponse(Disposition.class)
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
             assertThat(disposition.getSettled(), is(true));
 
             interaction.dispositionSettled(true)
@@ -689,8 +687,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                                                  .dispositionRole(Role.RECEIVER)
                                                  .dispositionState(null)
                                                  .disposition()
-                                                 .consumeResponse(Disposition.class)
-                                                 .getLatestResponse(Disposition.class);
+                                                 .consume(Disposition.class, Flow.class);
             assertThat(disposition.getSettled(), is(true));
 
             interaction.consumeResponse(null, Flow.class);
@@ -836,8 +833,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .consumeResponse(Attach.class)
                        .assertLatestResponse(Attach.class, this::assumeReceiverSettlesSecond)
                        .consumeResponse(Flow.class)
-                       .assertLatestResponse(Flow.class,
-                                             flow -> assumeThat(flow.getLinkCredit().intValue(), is(greaterThan(1))))
+                       .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
                        .transferDeliveryId()
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData(content1)
@@ -898,12 +894,9 @@ public class TransferTest extends BrokerAdminUsingTestBase
                        .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
                        .attach()
                        .consumeResponse(Attach.class)
-                       .consumeResponse(Flow.class);
-
-            Flow flow = interaction.getLatestResponse(Flow.class);
-            assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
-
-            interaction.transferDeliveryId(UnsignedInteger.ZERO)
+                       .consumeResponse(Flow.class)
+                       .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
+                       .transferDeliveryId(UnsignedInteger.ZERO)
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData(contents[0])
                        .transferSettled(true)
@@ -1202,24 +1195,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
     {
         do
         {
-            Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
-            if (response.getBody() instanceof Disposition)
-            {
-                Disposition disposition = (Disposition) response.getBody();
-                LongStream.rangeClosed(disposition.getFirst().longValue(),
-                                       disposition.getLast() == null
-                                               ? disposition.getFirst().longValue()
-                                               : disposition.getLast().longValue())
-                          .forEach(value -> {
-                              UnsignedInteger deliveryId = expectedDeliveryIds.first();
-                              assertThat(value, is(equalTo(deliveryId.longValue())));
-                              expectedDeliveryIds.remove(deliveryId);
-                          });
-            }
-            else if (response.getBody() instanceof Flow)
-            {
-                // ignore flows
-            }
+            Disposition disposition = interaction.consume(Disposition.class, Flow.class);
+            LongStream.rangeClosed(disposition.getFirst().longValue(),
+                                   disposition.getLast() == null
+                                           ? disposition.getFirst().longValue()
+                                           : disposition.getLast().longValue())
+                      .forEach(value -> {
+                          UnsignedInteger deliveryId = expectedDeliveryIds.first();
+                          assertThat(value, is(equalTo(deliveryId.longValue())));
+                          expectedDeliveryIds.remove(deliveryId);
+                      });
         }
         while (!expectedDeliveryIds.isEmpty());
     }
@@ -1243,9 +1228,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
 
     private void assumeSufficientCredits(final Flow flow)
     {
+        assumeThat(flow.getLinkCredit(), is(notNullValue()));
         assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)));
     }
 
+    private void assumeCreditsGreaterThanOne(final Flow flow)
+    {
+        assumeThat(flow.getLinkCredit(), is(notNullValue()));
+        assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ONE)));
+    }
+
     private void assumeReceiverSettlesSecond(final Attach attach)
     {
         assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND)));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 2036d1e..a22c8fc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -228,11 +228,11 @@ public class DischargeTest extends BrokerAdminUsingTestBase
                        .transferTransactionalStateFromCurrentTransaction()
                        .transferPayloadData(getTestName())
                        .transferHandle(UnsignedInteger.ONE)
-                       .transfer().consumeResponse(Disposition.class)
+                       .transfer().consume(Disposition.class, Flow.class);
 
-                       .detachHandle(UnsignedInteger.ONE)
-                       .detach().consumeResponse(Detach.class);
-            interaction.txnDischarge(false);
+            interaction.detachHandle(UnsignedInteger.ONE)
+                       .detach().consumeResponse(Detach.class)
+                       .txnDischarge(false);
 
             assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
         }
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 6a334dc..0827209 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -37,6 +37,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -106,8 +107,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalStateFromCurrentTransaction()
                                                          .transfer()
-                                                         .consumeResponse(Disposition.class)
-                                                         .getLatestResponse(Disposition.class);
+                                                         .consume(Disposition.class, Flow.class);
 
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -155,8 +155,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                                          .transferPayloadData(getTestName())
                                                          .transferTransactionalStateFromCurrentTransaction()
                                                          .transfer()
-                                                         .consumeResponse(Disposition.class)
-                                                         .getLatestResponse(Disposition.class);
+                                                         .consume(Disposition.class, Flow.class);
 
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -242,7 +241,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
 
             final Interaction interaction = transport.newInteraction();
 
-            Response<?> response = interaction.negotiateProtocol().consumeResponse()
+            ErrorCarryingFrameBody response = interaction.negotiateProtocol().consumeResponse()
                                               .open().consumeResponse(Open.class)
                                               .begin().consumeResponse(Begin.class)
 
@@ -260,10 +259,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                               .transferPayloadData(getTestName())
                                               .transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
                                               .transfer()
-                                              .consumeResponse()
-                                              .getLatestResponse();
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
 
-            assertUnknownTransactionIdError(response);
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
     }
 
@@ -413,14 +413,17 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
             Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
             assertThat(data, is(equalTo(getTestName())));
 
-            Response<?> response = interaction.dispositionSettled(true)
+            ErrorCarryingFrameBody response = interaction.dispositionSettled(true)
                                               .dispositionRole(Role.RECEIVER)
                                               .dispositionTransactionalState(integerToBinary(Integer.MAX_VALUE),
                                                                              new Accepted())
                                               .dispositionFirst(deliveryId)
                                               .disposition()
-                                              .consumeResponse().getLatestResponse();
-            assertUnknownTransactionIdError(response);
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
+
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
         finally
         {
@@ -630,7 +633,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
             final Interaction interaction = transport.newInteraction();
-            Response<?> response = interaction.negotiateProtocol()
+            ErrorCarryingFrameBody response = interaction.negotiateProtocol()
                                               .consumeResponse()
                                               .open()
                                               .consumeResponse(Open.class)
@@ -653,10 +656,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
                                               .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
                                                                                        integerToBinary(Integer.MAX_VALUE)))
                                               .flow()
-                                              .consumeResponse()
-                                              .getLatestResponse();
+                                              .consume(ErrorCarryingFrameBody.class, Flow.class);
 
-            assertUnknownTransactionIdError(response);
+            final Error error = response.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
         }
         finally
         {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org