You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/07 15:39:37 UTC
[2/2] qpid-broker-j git commit: QPID-7749: [Java Broker] [AMQP 1.0]
Settle incoming messages when "rcv-settle-mode" is default
QPID-7749: [Java Broker] [AMQP 1.0] Settle incoming messages when "rcv-settle-mode" is default
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/aa1f6ff0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/aa1f6ff0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/aa1f6ff0
Branch: refs/heads/master
Commit: aa1f6ff0fd1badd1478eb6ecadcea1eff52cbc1c
Parents: 6ee02c1
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jun 7 15:14:41 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 7 16:38:55 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 4 +-
.../v1_0/AbstractReceivingLinkEndpoint.java | 12 ---
.../protocol/v1_0/ErrantLinkEndpoint.java | 14 +++
.../qpid/server/protocol/v1_0/LinkEndpoint.java | 6 ++
.../v1_0/StandardReceivingLinkEndpoint.java | 71 ++++++++++++-
.../tests/protocol/v1_0/FrameTransport.java | 14 ++-
.../apache/qpid/tests/protocol/v1_0/Utils.java | 37 +++++++
.../protocol/v1_0/messaging/TransferTest.java | 102 ++++++++++++++++++-
.../protocol/v1_0/transport/link/FlowTest.java | 33 +-----
9 files changed, 239 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 87dc899..00f8724 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -343,7 +343,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
detach(error, true);
}
- private void detach(Error error, boolean close)
+ protected void detach(Error error, boolean close)
{
//TODO
switch (_state)
@@ -474,11 +474,13 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
return _link;
}
+ @Override
public SenderSettleMode getSendingSettlementMode()
{
return _sendingSettlementMode;
}
+ @Override
public ReceiverSettleMode getReceivingSettlementMode()
{
return _receivingSettlementMode;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 22c8b3f..4b55186 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -54,7 +54,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
{
UnsignedInteger _deliveryId;
- int _credit = 1;
boolean _settled;
private TransientState(final UnsignedInteger transferId)
@@ -62,16 +61,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
_deliveryId = transferId;
}
- void incrementCredit()
- {
- _credit++;
- }
-
- public int getCredit()
- {
- return _credit;
- }
-
public UnsignedInteger getDeliveryId()
{
return _deliveryId;
@@ -134,7 +123,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
else
{
transientState = _unsettledIds.get(deliveryTag);
- transientState.incrementCredit();
if (delivery.isSettled())
{
transientState.setSettled(true);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index d7a7667..0d9daa7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -28,7 +28,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T>
{
@@ -124,6 +126,18 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
}
@Override
+ public SenderSettleMode getSendingSettlementMode()
+ {
+ return null;
+ }
+
+ @Override
+ public ReceiverSettleMode getReceivingSettlementMode()
+ {
+ return null;
+ }
+
+ @Override
public void remoteDetached(final Detach detach)
{
// ignore
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index da8c0cb..61a0199 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -28,7 +28,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
{
@@ -67,4 +69,8 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
void destroy();
void close(Error error);
+
+ SenderSettleMode getSendingSettlementMode();
+
+ ReceiverSettleMode getReceivingSettlementMode();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 7d837d5..0282813 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -93,6 +93,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
setCreditWindow();
}
+
private TerminusDurability getDurability()
{
return getTarget().getDurable();
@@ -106,6 +107,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
final Binary deliveryTag = xfr.getDeliveryTag();
UnsignedInteger messageFormat = null;
+ ReceiverSettleMode transferReceiverSettleMode = null;
+ Error error = null;
if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
{
_incompleteMessage = new ArrayList<>();
@@ -124,12 +127,26 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
fragments = new ArrayList<>(_incompleteMessage.size());
- for(Transfer t : _incompleteMessage)
+ for (Transfer t : _incompleteMessage)
{
- if(t.getMessageFormat() != null && messageFormat == null)
+ if (t.getMessageFormat() != null && messageFormat == null)
{
messageFormat = t.getMessageFormat();
}
+
+ if (t.getRcvSettleMode() != null)
+ {
+ if (transferReceiverSettleMode == null)
+ {
+ transferReceiverSettleMode = t.getRcvSettleMode();
+ }
+ else if (!transferReceiverSettleMode.equals(t.getRcvSettleMode()))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
+ break;
+ }
+ }
fragments.addAll(t.getPayload());
t.dispose();
}
@@ -142,16 +159,33 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
_messageDeliveryTag = deliveryTag;
fragments = xfr.getPayload();
messageFormat = xfr.getMessageFormat();
-
+ transferReceiverSettleMode = xfr.getRcvSettleMode();
xfr.dispose();
}
+ if (error == null && !ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+ && ReceiverSettleMode.SECOND.equals(transferReceiverSettleMode))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+
+ }
+
+ if (error != null)
+ {
+ for (QpidByteBuffer fragment : fragments)
+ {
+ fragment.dispose();
+ }
+ return error;
+ }
+
if(_resumedMessage)
{
if(_unsettledMap.containsKey(_messageDeliveryTag))
{
Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
- boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+ boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
if(settled)
{
@@ -279,7 +313,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
- boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode() );
+ boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
if (!settled)
{
@@ -325,6 +359,16 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
return null;
}
+ private boolean shouldReceiverSettleFirst(ReceiverSettleMode transferReceiverSettleMode)
+ {
+ if (transferReceiverSettleMode == null)
+ {
+ transferReceiverSettleMode = getReceivingSettlementMode();
+ }
+
+ return transferReceiverSettleMode == null || ReceiverSettleMode.FIRST.equals(transferReceiverSettleMode);
+ }
+
@Override
protected void remoteDetachedPerformDetach(Detach detach)
{
@@ -599,4 +643,21 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
attachReceived(attach);
}
+
+ @Override
+ protected void detach(Error error, boolean close)
+ {
+ super.detach(error, close);
+
+ if (_incompleteMessage != null)
+ {
+ for (Transfer t : _incompleteMessage)
+ {
+ t.dispose();
+ }
+ _incompleteMessage = null;
+ }
+ _messageDeliveryTag = null;
+ _resumedMessage = false;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 14b28a6..47872f0 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -232,11 +232,16 @@ public class FrameTransport implements AutoCloseable
public void doAttachReceivingLink(String queueName) throws Exception
{
+ doAttachReceivingLink(UnsignedInteger.ZERO, queueName);
+ }
+
+ public void doAttachReceivingLink(final UnsignedInteger handle, String queueName) throws Exception
+ {
doBeginSession();
Role localRole = Role.RECEIVER;
Attach attach = new Attach();
attach.setName("testReceivingLink");
- attach.setHandle(new UnsignedInteger(0));
+ attach.setHandle(handle);
attach.setRole(localRole);
Source source = new Source();
source.setAddress(queueName);
@@ -256,7 +261,6 @@ public class FrameTransport implements AutoCloseable
public void doAttachSendingLink(final UnsignedInteger handle,
final String destination) throws Exception
{
- doBeginSession();
Attach attach = new Attach();
attach.setName("testSendingLink");
attach.setHandle(handle);
@@ -267,6 +271,12 @@ public class FrameTransport implements AutoCloseable
Target target = new Target();
target.setAddress(destination);
attach.setTarget(target);
+ doAttachSendingLink(attach);
+ }
+
+ public void doAttachSendingLink(final Attach attach) throws Exception
+ {
+ doBeginSession();
sendPerformative(attach);
PerformativeResponse response = (PerformativeResponse) getNextResponse();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 5ecc9e1..a9491e3 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -27,12 +27,16 @@ import static org.hamcrest.Matchers.is;
import java.net.InetSocketAddress;
+import org.hamcrest.core.Is;
+
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Utils
{
@@ -78,4 +82,37 @@ public class Utils
return queueExists;
}
}
+
+ public static Object receiveMessage(final InetSocketAddress brokerAddress,
+ final String queueName) throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(brokerAddress))
+ {
+ transport.doAttachReceivingLink(queueName);
+ Flow flow = new Flow();
+ flow.setIncomingWindow(UnsignedInteger.ONE);
+ flow.setNextIncomingId(UnsignedInteger.ZERO);
+ flow.setOutgoingWindow(UnsignedInteger.ZERO);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setHandle(UnsignedInteger.ZERO);
+ flow.setLinkCredit(UnsignedInteger.ONE);
+
+ transport.sendPerformative(flow);
+
+ MessageDecoder messageDecoder = new MessageDecoder();
+ boolean hasMore;
+ do
+ {
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+ assertThat(response, Is.is(notNullValue()));
+ assertThat(response.getFrameBody(), Is.is(instanceOf(Transfer.class)));
+ Transfer responseTransfer = (Transfer) response.getFrameBody();
+ messageDecoder.addTransfer(responseTransfer);
+ hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+ }
+ while (hasMore);
+
+ return messageDecoder.getData();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index f3d939f..3e57371 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -37,6 +37,7 @@ import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -46,9 +47,12 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
@@ -93,21 +97,23 @@ public class TransferTest extends ProtocolTestBase
}
@Test
- @Ignore("QPID-7749")
@SpecificationTest(section = "2.6.12",
description = "Transferring A Message.")
- public void transfer() throws Exception
+ public void transferUnsettled() throws Exception
{
+ String sentData = "foo";
try (FrameTransport transport = new FrameTransport(_brokerAddress))
{
final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
MessageEncoder messageEncoder = new MessageEncoder();
- messageEncoder.addData("foo");
+ messageEncoder.addData(sentData);
Transfer transfer = new Transfer();
transfer.setHandle(linkHandle);
+ transfer.setDeliveryId(UnsignedInteger.ZERO);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
transfer.setPayload(messageEncoder.getPayload());
transport.sendPerformative(transfer);
@@ -119,8 +125,96 @@ public class TransferTest extends ProtocolTestBase
assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+ }
+ }
- transport.assertNoMoreResponses();
+ @Test
+ @SpecificationTest(section = "2.7.5",
+ description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
+ public void transferReceiverSettleModeFirst() throws Exception
+ {
+ String sentData = "foo";
+ try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+ Attach attach = new Attach();
+ attach.setName("testSendingLink");
+ attach.setHandle(linkHandle);
+ attach.setRole(Role.SENDER);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ attach.setRcvSettleMode(ReceiverSettleMode.SECOND);
+ Source source = new Source();
+ attach.setSource(source);
+ Target target = new Target();
+ target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+ attach.setTarget(target);
+
+ transport.doAttachSendingLink(attach);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ messageEncoder.addData(sentData);
+
+ Transfer transfer = new Transfer();
+ transfer.setHandle(linkHandle);
+ transfer.setDeliveryId(UnsignedInteger.ZERO);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+ transfer.setPayload(messageEncoder.getPayload());
+ transfer.setRcvSettleMode(ReceiverSettleMode.FIRST);
+
+ transport.sendPerformative(transfer);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+ Disposition responseDisposition = (Disposition) response.getFrameBody();
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.5",
+ description = "If the negotiated link value is first, then it is illegal to set this field to second.")
+ public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
+ {
+ String sentData = "foo";
+ try (FrameTransport transport = new FrameTransport(_brokerAddress))
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+ Attach attach = new Attach();
+ attach.setName("testSendingLink");
+ attach.setHandle(linkHandle);
+ attach.setRole(Role.SENDER);
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ attach.setRcvSettleMode(ReceiverSettleMode.FIRST);
+ Source source = new Source();
+ attach.setSource(source);
+ Target target = new Target();
+ target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+ attach.setTarget(target);
+
+ transport.doAttachSendingLink(attach);
+
+ MessageEncoder messageEncoder = new MessageEncoder();
+ messageEncoder.addData(sentData);
+
+ Transfer transfer = new Transfer();
+ transfer.setHandle(linkHandle);
+ transfer.setDeliveryId(UnsignedInteger.ZERO);
+ transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+ transfer.setPayload(messageEncoder.getPayload());
+ transfer.setRcvSettleMode(ReceiverSettleMode.SECOND);
+
+ transport.sendPerformative(transfer);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Detach.class)));
+ Detach detach = (Detach) response.getFrameBody();
+ Error error = detach.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 722432a..9b4aa9c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -36,13 +36,12 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
public class FlowTest extends ProtocolTestBase
{
@@ -138,35 +137,9 @@ public class FlowTest extends ProtocolTestBase
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "foo");
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
- try (FrameTransport transport = new FrameTransport(addr))
- {
- transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
- Flow flow = new Flow();
- flow.setIncomingWindow(UnsignedInteger.ONE);
- flow.setNextIncomingId(UnsignedInteger.ZERO);
- flow.setOutgoingWindow(UnsignedInteger.ZERO);
- flow.setNextOutgoingId(UnsignedInteger.ZERO);
- flow.setHandle(UnsignedInteger.ZERO); // TODO
- flow.setLinkCredit(UnsignedInteger.ONE);
- transport.sendPerformative(flow);
-
- MessageDecoder messageDecoder = new MessageDecoder();
- boolean hasMore;
- do
- {
- PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
- assertThat(response, is(notNullValue()));
- assertThat(response.getFrameBody(), is(instanceOf(Transfer.class)));
- Transfer responseTransfer = (Transfer) response.getFrameBody();
- messageDecoder.addTransfer(responseTransfer);
- hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
- }
- while (hasMore);
-
- String data = (String) messageDecoder.getData();
- assertThat(data, is(equalTo("foo")));
- }
+ String data = (String) Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(data, is(equalTo("foo")));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org