You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:46 UTC
[qpid-broker-j] 03/05: QPID-8350: [Tests][AMQP 1.0] Ignore sporadic
flow perfromatives in transfer tests
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 1e416ebf2dc1a91ea3f1dc7332a66ee9de9e8316
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:28:06 2019 +0100
QPID-8350: [Tests][AMQP 1.0] Ignore sporadic flow perfromatives in transfer tests
---
.../qpid/tests/protocol/v1_0/Interaction.java | 20 +++++--
.../anonymousterminus/AnonymousTerminusTest.java | 54 +++++-------------
.../protocol/v1_0/messaging/MultiTransferTest.java | 17 +++---
.../protocol/v1_0/messaging/TransferTest.java | 66 ++++++++++------------
.../protocol/v1_0/transaction/DischargeTest.java | 8 +--
.../transaction/TransactionalTransferTest.java | 34 ++++++-----
6 files changed, 87 insertions(+), 112 deletions(-)
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 8c7709d..57d90d2 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -1054,19 +1054,25 @@ public class Interaction extends AbstractInteraction<Interaction>
private DeliveryState handleCoordinatorResponse() throws Exception
{
final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class));
-
if (_coordinatorCredits.decrementAndGet() == 0)
{
expected.add(Flow.class);
}
- final Map<Class<?>, ?> responses = consumeResponses(expected);
+ final Map<Class<?>, ?> responses = consumeResponses(expected, Collections.singleton(Flow.class));
final Disposition disposition = (Disposition) responses.get(Disposition.class);
if (expected.contains(Flow.class))
{
Flow flow = (Flow) responses.get(Flow.class);
- _coordinatorCredits.set(flow.getLinkCredit().longValue());
+ if (flow.getHandle().equals(getCoordinatorHandle()))
+ {
+ final UnsignedInteger linkCredit = flow.getLinkCredit();
+ if (linkCredit != null)
+ {
+ _coordinatorCredits.set(linkCredit.longValue());
+ }
+ }
}
if (!Boolean.TRUE.equals(disposition.getSettled()))
{
@@ -1075,13 +1081,15 @@ public class Interaction extends AbstractInteraction<Interaction>
return disposition.getState();
}
- private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes)
+ private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes, Set<Class<?>> ignore)
throws Exception
{
- Map<Class<?>, Object> results = new HashMap<>();
+ final Map<Class<?>, Object> results = new HashMap<>();
+ final Set<Class<?>> expected = new HashSet<>(responseTypes);
+ expected.addAll(ignore);
do
{
- Response<?> response = consumeResponse(responseTypes).getLatestResponse();
+ Response<?> response = consumeResponse(expected).getLatestResponse();
if (response != null && response.getBody() instanceof FrameBody)
{
Class<?> bodyClass = response.getBody().getClass();
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
index 369abf7..10a700c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/extensions/anonymousterminus/AnonymousTerminusTest.java
@@ -38,7 +38,6 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.protocol.v1_0.SequenceNumber;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -59,7 +58,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.util.StringUtil;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
@@ -72,7 +70,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
{
private static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
private static final Symbol DELIVERY_TAG = Symbol.valueOf("delivery-tag");
- private static final String TEST_MESSAGE_CONTENT = "test";
+
private InetSocketAddress _brokerAddress;
private Binary _deliveryTag;
@@ -110,8 +108,8 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transfer()
.sync();
- Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME),
+ is(equalTo(getTestName())));
}
}
@@ -145,7 +143,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferDeliveryTag(_deliveryTag)
.transfer();
- Detach detach = interaction.consumeResponse(Detach.class).getLatestResponse(Detach.class);
+ final Detach detach = interaction.consume(Detach.class, Flow.class);
Error error = detach.getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -183,10 +181,9 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferDeliveryId()
.transferPayload(generateMessagePayloadToDestination(getNonExistingDestinationName()))
.transferDeliveryTag(_deliveryTag)
- .transfer()
- .consumeResponse();
+ .transfer();
- Disposition disposition = interaction.getLatestResponse(Disposition.class);
+ final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
assertThat(disposition.getSettled(), is(true));
@@ -233,7 +230,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferDeliveryTag(_deliveryTag)
.transfer();
- Detach detach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ final Detach detach = interaction.consume(Detach.class, Flow.class);
Error error = detach.getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -275,7 +272,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
}
@@ -305,7 +302,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferSettled(Boolean.FALSE)
.transfer();
- Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
assertThat(disposition.getSettled(), is(true));
@@ -320,7 +317,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ assertThat(receivedMessage, is(equalTo(getTestName())));
}
}
@@ -351,7 +348,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferSettled(Boolean.FALSE)
.transfer();
- Disposition disposition = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ final Disposition disposition = interaction.consume(Disposition.class, Flow.class);
assertThat(disposition.getSettled(), is(true));
@@ -400,7 +397,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferSettled(Boolean.FALSE)
.transfer();
- Detach senderLinkDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ final Detach senderLinkDetach = interaction.consume(Detach.class, Flow.class);
Error senderLinkDetachError = senderLinkDetach.getError();
assertThat(senderLinkDetachError, is(notNullValue()));
assertThat(senderLinkDetachError.getCondition(), is(equalTo(AmqpError.NOT_FOUND)));
@@ -537,7 +534,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
.transferSettled(Boolean.TRUE)
.transfer().txnSendDischarge(false);
- Detach transactionCoordinatorDetach = interaction.consumeResponse().getLatestResponse(Detach.class);
+ final Detach transactionCoordinatorDetach = interaction.consume(Detach.class, Flow.class);
Error transactionCoordinatorDetachError = transactionCoordinatorDetach.getError();
assertThat(transactionCoordinatorDetachError, is(notNullValue()));
assertThat(transactionCoordinatorDetachError.getCondition(), is(equalTo(TransactionError.TRANSACTION_ROLLBACK)));
@@ -549,29 +546,6 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
return String.format("%sNonExisting%s", getTestName(), new StringUtil().randomAlphaNumericString(10));
}
- private Disposition getDispositionForDeliveryId(final Interaction interaction,
- final UnsignedInteger deliveryId) throws Exception
- {
- Disposition dischargeTransactionDisposition = null;
-
- SequenceNumber id = new SequenceNumber(deliveryId.intValue());
- do
- {
- Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
- if (response.getBody() instanceof Disposition)
- {
- Disposition disposition = (Disposition) response.getBody();
- UnsignedInteger first = disposition.getFirst();
- UnsignedInteger last = disposition.getLast() == null ? disposition.getFirst() : disposition.getLast();
- if (new SequenceNumber(first.intValue()).compareTo(id) >= 0 && new SequenceNumber(last.intValue()).compareTo(id) <=0)
- {
- dischargeTransactionDisposition = disposition;
- }
- }
- } while (dischargeTransactionDisposition == null);
- return dischargeTransactionDisposition;
- }
-
private Interaction openInteractionWithAnonymousRelayCapability(final FrameTransport transport) throws Exception
{
final Interaction interaction = transport.newInteraction();
@@ -590,7 +564,7 @@ public class AnonymousTerminusTest extends BrokerAdminUsingTestBase
final Properties properties = new Properties();
properties.setTo(destinationName);
messageEncoder.setProperties(properties);
- messageEncoder.addData(TEST_MESSAGE_CONTENT);
+ messageEncoder.addData(getTestName());
return messageEncoder.getPayload();
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index a5f7e7c..536d6ab 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -48,9 +48,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.ChannelClosedResponse;
+import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.Utils;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -100,8 +100,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
.transferMore(false)
.transferPayload(payloads[1])
.transfer()
- .consumeResponse()
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
for (final QpidByteBuffer payload : payloads)
{
@@ -158,15 +157,14 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
.transferDeliveryTag(null)
.transferMore(false)
.transferPayload(payloads[3])
- .transfer()
- .consumeResponse();
-
- Disposition disposition = interaction.getLatestResponse(Disposition.class);
+ .transfer().sync();
for (final QpidByteBuffer payload : payloads)
{
payload.dispose();
}
+
+ Disposition disposition = interaction.consume(Disposition.class, Flow.class);
assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
assertThat(disposition.getLast(), oneOf(null, deliveryId));
assertThat(disposition.getSettled(), is(equalTo(true)));
@@ -201,8 +199,8 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
.transferDeliveryId(deliveryId)
.transferDeliveryTag(deliveryTag)
.transferMore(true)
+ .transferSettled(true)
.transfer()
- .sync()
.transferPayload(null)
.transferMore(null)
.transferAborted(true)
@@ -304,8 +302,7 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
for (int i = 0; i < 2; i++)
{
- Disposition disposition = interaction.consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
+ Disposition disposition = interaction.consume(Disposition.class, Flow.class);
dispositionMap.put(disposition.getFirst(), disposition);
assertThat(disposition.getLast(), oneOf(null, disposition.getFirst()));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index c89898d..733ba94 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -192,8 +192,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transferHandle(linkHandle)
.transferPayloadData(getTestName())
.transfer()
- .consumeResponse()
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -245,7 +244,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transferDeliveryTag(deliveryTag)
.transfer();
- final Disposition disposition1 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ final Disposition disposition1 = interaction.consume(Disposition.class, Flow.class);
final UnsignedInteger first = disposition1.getFirst();
final UnsignedInteger last = disposition1.getLast();
@@ -254,7 +253,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
if (last == null || first.equals(last))
{
- final Disposition disposition2 = interaction.consumeResponse().getLatestResponse(Disposition.class);
+ final Disposition disposition2 = interaction.consume(Disposition.class, Flow.class);
assertThat(disposition2.getFirst(), anyOf(is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
assertThat(disposition2.getLast(), anyOf(nullValue(), is(UnsignedInteger.ZERO), is(UnsignedInteger.ONE)));
assertThat(disposition2.getFirst(), is(not(equalTo(first))));
@@ -265,7 +264,8 @@ public class TransferTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "2.7.5",
- description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
+ description = "If first, this indicates that the receiver MUST settle the delivery once"
+ + " it has arrived without waiting for the sender to settle first")
public void transferReceiverSettleModeFirst() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -282,8 +282,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.transferPayloadData(getTestName())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
- .consumeResponse()
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
@@ -585,8 +584,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.dispositionRole(Role.RECEIVER)
.dispositionState(new Accepted())
.disposition()
- .consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(disposition.getSettled(), is(true));
interaction.dispositionSettled(true)
@@ -689,8 +687,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.dispositionRole(Role.RECEIVER)
.dispositionState(null)
.disposition()
- .consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(disposition.getSettled(), is(true));
interaction.consumeResponse(null, Flow.class);
@@ -836,8 +833,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
.consumeResponse(Attach.class)
.assertLatestResponse(Attach.class, this::assumeReceiverSettlesSecond)
.consumeResponse(Flow.class)
- .assertLatestResponse(Flow.class,
- flow -> assumeThat(flow.getLinkCredit().intValue(), is(greaterThan(1))))
+ .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
.transferDeliveryId()
.transferDeliveryTag(deliveryTag)
.transferPayloadData(content1)
@@ -898,12 +894,9 @@ public class TransferTest extends BrokerAdminUsingTestBase
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.consumeResponse(Attach.class)
- .consumeResponse(Flow.class);
-
- Flow flow = interaction.getLatestResponse(Flow.class);
- assertThat(flow.getLinkCredit().intValue(), is(greaterThan(1)));
-
- interaction.transferDeliveryId(UnsignedInteger.ZERO)
+ .consumeResponse(Flow.class)
+ .assertLatestResponse(Flow.class, this::assumeCreditsGreaterThanOne)
+ .transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData(contents[0])
.transferSettled(true)
@@ -1202,24 +1195,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
{
do
{
- Response<?> response = interaction.consumeResponse(Disposition.class, Flow.class).getLatestResponse();
- if (response.getBody() instanceof Disposition)
- {
- Disposition disposition = (Disposition) response.getBody();
- LongStream.rangeClosed(disposition.getFirst().longValue(),
- disposition.getLast() == null
- ? disposition.getFirst().longValue()
- : disposition.getLast().longValue())
- .forEach(value -> {
- UnsignedInteger deliveryId = expectedDeliveryIds.first();
- assertThat(value, is(equalTo(deliveryId.longValue())));
- expectedDeliveryIds.remove(deliveryId);
- });
- }
- else if (response.getBody() instanceof Flow)
- {
- // ignore flows
- }
+ Disposition disposition = interaction.consume(Disposition.class, Flow.class);
+ LongStream.rangeClosed(disposition.getFirst().longValue(),
+ disposition.getLast() == null
+ ? disposition.getFirst().longValue()
+ : disposition.getLast().longValue())
+ .forEach(value -> {
+ UnsignedInteger deliveryId = expectedDeliveryIds.first();
+ assertThat(value, is(equalTo(deliveryId.longValue())));
+ expectedDeliveryIds.remove(deliveryId);
+ });
}
while (!expectedDeliveryIds.isEmpty());
}
@@ -1243,9 +1228,16 @@ public class TransferTest extends BrokerAdminUsingTestBase
private void assumeSufficientCredits(final Flow flow)
{
+ assumeThat(flow.getLinkCredit(), is(notNullValue()));
assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ZERO)));
}
+ private void assumeCreditsGreaterThanOne(final Flow flow)
+ {
+ assumeThat(flow.getLinkCredit(), is(notNullValue()));
+ assumeThat(flow.getLinkCredit(), is(greaterThan(UnsignedInteger.ONE)));
+ }
+
private void assumeReceiverSettlesSecond(final Attach attach)
{
assumeThat(attach.getRcvSettleMode(), is(equalTo(ReceiverSettleMode.SECOND)));
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index 2036d1e..a22c8fc 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -228,11 +228,11 @@ public class DischargeTest extends BrokerAdminUsingTestBase
.transferTransactionalStateFromCurrentTransaction()
.transferPayloadData(getTestName())
.transferHandle(UnsignedInteger.ONE)
- .transfer().consumeResponse(Disposition.class)
+ .transfer().consume(Disposition.class, Flow.class);
- .detachHandle(UnsignedInteger.ONE)
- .detach().consumeResponse(Detach.class);
- interaction.txnDischarge(false);
+ interaction.detachHandle(UnsignedInteger.ONE)
+ .detach().consumeResponse(Detach.class)
+ .txnDischarge(false);
assertThat(interaction.getCoordinatorLatestDeliveryState(), is(instanceOf(Accepted.class)));
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 6a334dc..0827209 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -37,6 +37,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -106,8 +107,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
.transferPayloadData(getTestName())
.transferTransactionalStateFromCurrentTransaction()
.transfer()
- .consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -155,8 +155,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
.transferPayloadData(getTestName())
.transferTransactionalStateFromCurrentTransaction()
.transfer()
- .consumeResponse(Disposition.class)
- .getLatestResponse(Disposition.class);
+ .consume(Disposition.class, Flow.class);
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
@@ -242,7 +241,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
final Interaction interaction = transport.newInteraction();
- Response<?> response = interaction.negotiateProtocol().consumeResponse()
+ ErrorCarryingFrameBody response = interaction.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
@@ -260,10 +259,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
.transferPayloadData(getTestName())
.transferTransactionalState(integerToBinary(Integer.MAX_VALUE))
.transfer()
- .consumeResponse()
- .getLatestResponse();
+ .consume(ErrorCarryingFrameBody.class, Flow.class);
- assertUnknownTransactionIdError(response);
+ final Error error = response.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
}
}
@@ -413,14 +413,17 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
assertThat(data, is(equalTo(getTestName())));
- Response<?> response = interaction.dispositionSettled(true)
+ ErrorCarryingFrameBody response = interaction.dispositionSettled(true)
.dispositionRole(Role.RECEIVER)
.dispositionTransactionalState(integerToBinary(Integer.MAX_VALUE),
new Accepted())
.dispositionFirst(deliveryId)
.disposition()
- .consumeResponse().getLatestResponse();
- assertUnknownTransactionIdError(response);
+ .consume(ErrorCarryingFrameBody.class, Flow.class);
+
+ final Error error = response.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
}
finally
{
@@ -630,7 +633,7 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
final Interaction interaction = transport.newInteraction();
- Response<?> response = interaction.negotiateProtocol()
+ ErrorCarryingFrameBody response = interaction.negotiateProtocol()
.consumeResponse()
.open()
.consumeResponse(Open.class)
@@ -653,10 +656,11 @@ public class TransactionalTransferTest extends BrokerAdminUsingTestBase
.flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"),
integerToBinary(Integer.MAX_VALUE)))
.flow()
- .consumeResponse()
- .getLatestResponse();
+ .consume(ErrorCarryingFrameBody.class, Flow.class);
- assertUnknownTransactionIdError(response);
+ final Error error = response.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), equalTo(TransactionError.UNKNOWN_ID));
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org