You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2019/08/23 13:04:47 UTC
[qpid-broker-j] 04/05: QPID-8350: [Tests][AMQP 1.0] Improve
handling of amqp errors in tests
This is an automated email from the ASF dual-hosted git repository.
orudyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-broker-j.git
commit 605e727da226b503db95d3d2078e6196caf1aab6
Author: Alex Rudyy <or...@apache.org>
AuthorDate: Fri Aug 23 13:39:35 2019 +0100
QPID-8350: [Tests][AMQP 1.0] Improve handling of amqp errors in tests
---
.../qpid/tests/protocol/v1_0/DecodeErrorTest.java | 148 +++++++++------------
.../protocol/v1_0/messaging/MessageFormat.java | 42 ++----
.../protocol/v1_0/messaging/MultiTransferTest.java | 29 +++-
.../protocol/v1_0/messaging/TransferTest.java | 120 ++++++++++-------
.../v1_0/transport/connection/OpenTest.java | 23 ++--
.../protocol/v1_0/transport/link/AttachTest.java | 33 +++--
.../protocol/v1_0/transport/link/FlowTest.java | 35 +++--
.../v1_0/transport/link/LinkStealingTest.java | 5 +-
.../protocol/v1_0/transport/session/BeginTest.java | 17 ++-
9 files changed, 238 insertions(+), 214 deletions(-)
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
index ce604c9..c0aaf45 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/DecodeErrorTest.java
@@ -25,8 +25,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
@@ -40,6 +40,7 @@ import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.codec.StringWriter;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
@@ -52,13 +53,11 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.utils.BrokerAdmin;
@@ -91,6 +90,7 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
.begin()
.consumeResponse(Begin.class)
.attachRole(Role.SENDER)
+ .attachSndSettleMode(SenderSettleMode.SETTLED)
.attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
.attach()
.consumeResponse(Attach.class)
@@ -99,31 +99,24 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
flow -> assumeThat(flow.getLinkCredit(),
is(greaterThan(UnsignedInteger.ZERO))));
- final List<QpidByteBuffer> payloads = buildInvalidMessage();
- try
- {
- try (QpidByteBuffer combinedPayload = QpidByteBuffer.concatenate(payloads))
- {
- interaction.transferMessageFormat(UnsignedInteger.ZERO)
- .transferPayload(combinedPayload)
- .transfer();
- }
- }
- finally
+ try(final QpidByteBuffer payload = buildInvalidMessage())
{
- payloads.forEach(QpidByteBuffer::dispose);
+ interaction.transferMessageFormat(UnsignedInteger.ZERO)
+ .transferPayload(payload)
+ .transfer();
}
- final Detach detachResponse = interaction.consumeResponse()
- .getLatestResponse(Detach.class);
- assertThat(detachResponse.getError(), is(notNullValue()));
- assertThat(detachResponse.getError().getCondition(), is(equalTo(DECODE_ERROR)));
}
+
+ final String validMessage = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, validMessage);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(validMessage)));
}
@Test
@SpecificationTest(section = "3.5.9",
- description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+ description = "Node Properties [...] lifetime-policy [...] "
+ + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
public void nodePropertiesLifetimePolicy() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -143,24 +136,10 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
assertThat(latestResponse, is(notNullValue()));
final Object responseBody = latestResponse.getBody();
- final Error error;
- if (responseBody instanceof End)
- {
- error = ((End) responseBody).getError();
- }
- else if (responseBody instanceof Close)
- {
- error = ((Close) responseBody).getError();
- }
- else if (responseBody instanceof Detach)
- {
- error = ((Detach) responseBody).getError();
- }
- else
- {
- fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
- error = null;
- }
+ assertThat(responseBody, is(notNullValue()));
+ assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
+
+ final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
@@ -169,7 +148,8 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "3.5.9",
- description = "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
+ description = "Node Properties [...] supported-dist-modes [...] "
+ + "The value of this entry MUST be of a type which provides the lifetime-policy archetype.")
public void nodePropertiesSupportedDistributionModes() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -189,70 +169,66 @@ public class DecodeErrorTest extends BrokerAdminUsingTestBase
assertThat(latestResponse, is(notNullValue()));
final Object responseBody = latestResponse.getBody();
- final Error error;
- if (responseBody instanceof End)
- {
- error = ((End) responseBody).getError();
- }
- else if (responseBody instanceof Close)
- {
- error = ((Close) responseBody).getError();
- }
- else
- {
- fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
- error = null;
- }
+ assertThat(responseBody, is(notNullValue()));
+ assertThat(responseBody, instanceOf(ErrorCarryingFrameBody.class));
+ final Error error = ((ErrorCarryingFrameBody) responseBody).getError();
assertThat(error, is(notNullValue()));
assertThat(error.getCondition(), is(equalTo(DECODE_ERROR)));
}
}
- private List<QpidByteBuffer> buildInvalidMessage()
+ private QpidByteBuffer buildInvalidMessage()
{
final List<QpidByteBuffer> payloads = new ArrayList<>();
- final Header header = new Header();
- header.setTtl(UnsignedInteger.valueOf(1000L));
- final HeaderSection headerSection = header.createEncodingRetainingSection();
try
{
- payloads.add(headerSection.getEncodedForm());
- }
- finally
- {
- headerSection.dispose();
- }
+ final Header header = new Header();
+ header.setTtl(UnsignedInteger.valueOf(10000L));
+ final HeaderSection headerSection = header.createEncodingRetainingSection();
+ try
+ {
+ payloads.add(headerSection.getEncodedForm());
+ }
+ finally
+ {
+ headerSection.dispose();
+ }
- final StringWriter stringWriter = new StringWriter("string in between annotation sections");
- QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
- stringWriter.writeToBuffer(encodedString);
- encodedString.flip();
- payloads.add(encodedString);
+ final StringWriter stringWriter = new StringWriter("string in between message sections");
+ final QpidByteBuffer encodedString = QpidByteBuffer.allocate(stringWriter.getEncodedSize());
+ stringWriter.writeToBuffer(encodedString);
+ encodedString.flip();
+ payloads.add(encodedString);
- final Map<Symbol, Object> annoationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
- final DeliveryAnnotations annotations = new DeliveryAnnotations(annoationMap);
- final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
- try
- {
+ final Map<Symbol, Object> annotationMap = Collections.singletonMap(Symbol.valueOf("foo"), "bar");
+ final DeliveryAnnotations annotations = new DeliveryAnnotations(annotationMap);
+ final DeliveryAnnotationsSection deliveryAnnotationsSection = annotations.createEncodingRetainingSection();
+ try
+ {
+ payloads.add(deliveryAnnotationsSection.getEncodedForm());
+ }
+ finally
+ {
+ deliveryAnnotationsSection.dispose();
+ }
- payloads.add(deliveryAnnotationsSection.getEncodedForm());
- }
- finally
- {
- deliveryAnnotationsSection.dispose();
- }
+ final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
+ try
+ {
+ payloads.add(payload.getEncodedForm());
+ }
+ finally
+ {
+ payload.dispose();
+ }
- final AmqpValueSection payload = new AmqpValue(getTestName()).createEncodingRetainingSection();
- try
- {
- payloads.add(payload.getEncodedForm());
+ return QpidByteBuffer.concatenate(payloads);
}
finally
{
- payload.dispose();
+ payloads.forEach(QpidByteBuffer::dispose);
}
- return payloads;
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
index 6ac8e7f..5539291 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MessageFormat.java
@@ -20,10 +20,9 @@
package org.apache.qpid.tests.protocol.v1_0.messaging;
-import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.fail;
import java.net.InetSocketAddress;
@@ -34,18 +33,13 @@ import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Utils;
-import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
@@ -72,7 +66,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
{
QpidByteBuffer[] payloads = Utils.splitPayload(getTestName(), 2);
- final Response<?> latestResponse = transport.newInteraction()
+ transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.begin().consumeResponse(Begin.class)
@@ -83,6 +77,7 @@ public class MessageFormat extends BrokerAdminUsingTestBase
.transferMore(true)
.transferMessageFormat(UnsignedInteger.ZERO)
.transferPayload(payloads[0])
+ .transferSettled(true)
.transfer()
.consumeResponse(null, Flow.class, Disposition.class)
.transferDeliveryTag(null)
@@ -91,35 +86,16 @@ public class MessageFormat extends BrokerAdminUsingTestBase
.transferMessageFormat(UnsignedInteger.ONE)
.transferPayload(payloads[1])
.transfer()
- .consumeResponse(Detach.class, End.class, Close.class)
- .getLatestResponse();
+ .sync();
for (final QpidByteBuffer payload : payloads)
{
payload.dispose();
}
- assertThat(latestResponse, is(notNullValue()));
- final Object responseBody = latestResponse.getBody();
- final Error error;
- if (responseBody instanceof Detach)
- {
- error = ((Detach) responseBody).getError();
- }
- else if (responseBody instanceof End)
- {
- error = ((End) responseBody).getError();
- }
- else if (responseBody instanceof Close)
- {
- error = ((Close) responseBody).getError();
- }
- else
- {
- fail(String.format("Expected response of either Detach, End, or Close. Got '%s'", responseBody));
- error = null;
- }
-
- assertThat(error, is(notNullValue()));
}
+
+ final String testMessage = getTestName() + "_2";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, testMessage);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(testMessage)));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 536d6ab..766afe5 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -210,7 +210,6 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
{
payload.dispose();
}
- interaction.consumeResponse(null, Flow.class);
}
String secondMessage = getTestName() + "_2";
Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, secondMessage);
@@ -320,7 +319,12 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "2.6.14",
- description = "[...]messages transferred along a single link MUST NOT be interleaved")
+ description = "For messages that are too large to fit within the maximum frame size,"
+ + " additional data MAY be transferred in additional transfer frames by setting"
+ + " the more flag on all but the last transfer frame."
+ + " When a message is split up into multiple transfer frames in this manner,"
+ + " messages being transferred along different links MAY be interleaved."
+ + " However, messages transferred along a single link MUST NOT be interleaved.")
public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
{
String messageContent1 = getTestName() + "_1";
@@ -351,14 +355,28 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
.transferDeliveryTag(deliveryTag1)
.transferMore(true)
.transferPayload(messagePayload1[0])
+ .transferSettled(true)
.transfer()
- .sync()
.transferDeliveryId(deliveryId2)
.transferDeliveryTag(deliveryTag2)
.transferMore(true)
+ .transferSettled(true)
.transferPayload(messagePayload2[0])
.transfer()
+
+ .transferDeliveryId(deliverId1)
+ .transferDeliveryTag(deliveryTag1)
+ .transferMore(false)
+ .transferPayload(messagePayload1[1])
+ .transfer()
+
+ .transferDeliveryId(deliveryId2)
+ .transferDeliveryTag(deliveryTag2)
+ .transferMore(false)
+ .transferPayload(messagePayload2[1])
+ .transfer()
+
.sync();
for (final QpidByteBuffer payload : messagePayload1)
{
@@ -369,7 +387,10 @@ public class MultiTransferTest extends BrokerAdminUsingTestBase
payload.dispose();
}
- interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
}
+
+ final String controlMessage = getTestName() + "_Control";
+ Utils.putMessageOnQueue(getBrokerAdmin(), BrokerAdmin.TEST_QUEUE_NAME, controlMessage);
+ assertThat(Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(controlMessage)));
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index 733ba94..6332ce4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -68,7 +68,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
-import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
@@ -77,7 +76,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.tests.protocol.ChannelClosedResponse;
import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.SpecificationTest;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -106,7 +104,7 @@ public class TransferTest extends BrokerAdminUsingTestBase
@Test
@SpecificationTest(section = "1.3.4",
description = "mandatory [...] a non null value for the field is always encoded.")
- public void emptyTransfer() throws Exception
+ public void transferHandleUnspecified() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
@@ -123,25 +121,43 @@ public class TransferTest extends BrokerAdminUsingTestBase
.consumeResponse()
.getLatestResponse();
+ assertThat(response, is(notNullValue()));
assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
- if (response.getBody() instanceof Close)
- {
- final Close responseClose = (Close)response.getBody();
- assertThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+ }
+ }
- interact.close().sync();
- }
- else if (response.getBody() instanceof End)
- {
- final End responseEnd = (End)response.getBody();
- assertThat(responseEnd.getError(), is(notNullValue()));
- assertThat(responseEnd.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ @Test
+ @SpecificationTest(section = "2.7.5",
+ description = "The delivery-id MUST be supplied on the first transfer of a multi-transfer delivery.")
+ public void transferDeliveryIdUnspecified() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ Interaction interact = transport.newInteraction();
+ Response<?> response = interact.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+ .transferDeliveryId(null)
+ .transfer()
+ .consumeResponse()
+ .getLatestResponse();
- interact.end().doCloseConnection();
- }
- transport.assertNoMoreResponses();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+ final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
}
}
@@ -149,30 +165,40 @@ public class TransferTest extends BrokerAdminUsingTestBase
@SpecificationTest(section = "2.7.5",
description = "[delivery-tag] MUST be specified for the first transfer "
+ "[...] and can only be omitted for continuation transfers.")
- public void transferWithoutDeliveryTag() throws Exception
+ public void transferDeliveryTagUnspecified() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
Interaction interaction = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
- .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
- .transferDeliveryId()
- .transferDeliveryTag(null)
- .transferPayloadData(getTestName())
- .transfer();
- interaction.consumeResponse(Detach.class, End.class, Close.class, ChannelClosedResponse.class);
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .assertLatestResponse(Flow.class, this::assumeSufficientCredits)
+ .transferDeliveryId()
+ .transferDeliveryTag(null)
+ .transferPayloadData(getTestName())
+ .transfer()
+ .consumeResponse();
+
+ final Response<?> response = interaction.getLatestResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+
+ final Error error = ((ErrorCarryingFrameBody)response.getBody()).getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
}
}
@Test
- @SpecificationTest(section = "2.6.12",
- description = "Transferring A Message.")
+ @SpecificationTest(section = "2.6.12 Transferring A Message",
+ description = "[...] the receiving application chooses to settle immediately upon processing the message"
+ + " rather than waiting for the sender to settle first, that yields an at-least-once guarantee.")
public void transferUnsettled() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
@@ -319,27 +345,19 @@ public class TransferTest extends BrokerAdminUsingTestBase
.consumeResponse()
.getLatestResponse();
- if (response.getBody() instanceof Detach)
- {
- final Detach detach = (Detach) response.getBody();
- Error error = detach.getError();
- assertThat(error, is(notNullValue()));
- assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
- }
- else
- {
- if (response.getBody() instanceof Disposition)
- {
- // clean up
- Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
- }
- fail("it is illegal to set transfer 'rcv-settle-mode' to 'second' when link 'rcv-settle-mode' is set to 'first'");
- }
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(Detach.class)));
+
+ final Detach detach = (Detach) response.getBody();
+ Error error = detach.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
}
}
@Test
- @SpecificationTest(section = "", description = "Pipelined message send")
+ @SpecificationTest(section = "2.6.12 Transferring A Message", description = "Pipelined message send")
public void presettledPipelined() throws Exception
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
index 17fdab0..f0f46f4 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/connection/OpenTest.java
@@ -25,6 +25,7 @@ import static org.hamcrest.CoreMatchers.both;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -74,7 +75,7 @@ public class OpenTest extends BrokerAdminUsingTestBase
Error error = responseClose.getError();
if (error != null)
{
- assertThat(error.getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
}
}
}
@@ -89,16 +90,18 @@ public class OpenTest extends BrokerAdminUsingTestBase
try (FrameTransport transport = new FrameTransport(addr).connect())
{
Interaction interaction = transport.newInteraction();
- Open responseOpen = interaction
- .negotiateProtocol().consumeResponse()
- .openContainerId("testContainerId")
- .open().consumeResponse()
- .getLatestResponse(Open.class);
+ final Open responseOpen = interaction.negotiateProtocol().consumeResponse()
+ .openContainerId("testContainerId")
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+
assertThat(responseOpen.getContainerId(), is(notNullValue()));
- assertThat(responseOpen.getMaxFrameSize().longValue(),
- is(both(greaterThanOrEqualTo(0L)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE.longValue()))));
- assertThat(responseOpen.getChannelMax().intValue(),
- is(both(greaterThanOrEqualTo(0)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE.intValue()))));
+ assertThat(responseOpen.getMaxFrameSize(),
+ is(anyOf(nullValue(),
+ both(greaterThan(UnsignedInteger.ZERO)).and(lessThanOrEqualTo(UnsignedInteger.MAX_VALUE)))));
+ assertThat(responseOpen.getChannelMax(),
+ is(anyOf(nullValue(),
+ both(greaterThanOrEqualTo(UnsignedShort.ZERO)).and(lessThanOrEqualTo(UnsignedShort.MAX_VALUE)))));
interaction.doCloseConnection();
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
index 75f5985..a170e7e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/AttachTest.java
@@ -19,10 +19,12 @@
package org.apache.qpid.tests.protocol.v1_0.transport.link;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
@@ -32,6 +34,7 @@ import java.net.InetSocketAddress;
import org.junit.Test;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -39,8 +42,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.protocol.SpecificationTest;
@@ -58,17 +63,23 @@ public class AttachTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
{
- Close responseClose = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(null)
- .attachHandle(null)
- .attachName(null)
- .attach().consumeResponse()
- .getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ final Response<?> response = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(null)
+ .attachHandle(null)
+ .attachName(null)
+ .attach().consumeResponse()
+ .getLatestResponse();
+ assertThat(response.getBody(), is(notNullValue()));
+ assertThat(response.getBody(), instanceOf(ErrorCarryingFrameBody.class));
+ final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+ if (error != null)
+ {
+ assertThat(error.getCondition(),
+ anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+ }
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 448de42..1f9968b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -20,6 +20,7 @@
package org.apache.qpid.tests.protocol.v1_0.transport.link;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
@@ -32,13 +33,14 @@ import java.net.InetSocketAddress;
import org.junit.Test;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -62,19 +64,24 @@ public class FlowTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr).connect())
{
- Close responseClose = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .flowIncomingWindow(null)
- .flowNextIncomingId(null)
- .flowOutgoingWindow(null)
- .flowNextOutgoingId(null)
- .flow()
- .consumeResponse(Close.class)
- .getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), is(AmqpError.DECODE_ERROR));
+ final Response<?> response = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .flowIncomingWindow(null)
+ .flowNextIncomingId(null)
+ .flowOutgoingWindow(null)
+ .flowNextOutgoingId(null)
+ .flow()
+ .consumeResponse()
+ .getLatestResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+ final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+ if (error != null)
+ {
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+ }
}
}
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
index 260a3bc..fabcfa9 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/LinkStealingTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase;
+import org.apache.qpid.tests.utils.BrokerSpecific;
public class LinkStealingTest extends BrokerAdminUsingTestBase
{
@@ -94,7 +95,9 @@ public class LinkStealingTest extends BrokerAdminUsingTestBase
@Test
- @SpecificationTest(section = "2.6.1. Naming a link", description = "")
+ @SpecificationTest(section = "2.6.1. Naming a link",
+ description = "Qpid Broker J extended stolen behaviour on sessions")
+ @BrokerSpecific(kind = BrokerAdmin.KIND_BROKER_J)
public void subsequentAttachOnDifferentSessions() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
index f25d275..a1ea481 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/session/BeginTest.java
@@ -19,6 +19,7 @@
package org.apache.qpid.tests.protocol.v1_0.transport.session;
+import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
@@ -31,13 +32,16 @@ import java.net.InetSocketAddress;
import org.junit.Test;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCarryingFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.tests.protocol.Response;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
import org.apache.qpid.tests.utils.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
@@ -54,16 +58,21 @@ public class BeginTest extends BrokerAdminUsingTestBase
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try(FrameTransport transport = new FrameTransport(addr).connect())
{
- Close responseClose = transport.newInteraction()
+ final Response<?> response = transport.newInteraction()
.negotiateProtocol().consumeResponse()
.open().consumeResponse(Open.class)
.beginNextOutgoingId(null)
.beginIncomingWindow(null)
.beginOutgoingWindow(null)
.begin().consumeResponse()
- .getLatestResponse(Close.class);
- assumeThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.DECODE_ERROR));
+ .getLatestResponse();
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getBody(), is(instanceOf(ErrorCarryingFrameBody.class)));
+ final Error error = ((ErrorCarryingFrameBody) response.getBody()).getError();
+ if (error != null)
+ {
+ assertThat(error.getCondition(), anyOf(equalTo(AmqpError.DECODE_ERROR), equalTo(AmqpError.INVALID_FIELD)));
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org