You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/07 15:39:37 UTC

[2/2] qpid-broker-j git commit: QPID-7749: [Java Broker] [AMQP 1.0] Settle incoming messages when "rcv-settle-mode" is default

QPID-7749: [Java Broker] [AMQP 1.0] Settle incoming messages when "rcv-settle-mode" is default


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/aa1f6ff0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/aa1f6ff0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/aa1f6ff0

Branch: refs/heads/master
Commit: aa1f6ff0fd1badd1478eb6ecadcea1eff52cbc1c
Parents: 6ee02c1
Author: Alex Rudyy <or...@apache.org>
Authored: Wed Jun 7 15:14:41 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 7 16:38:55 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     |   4 +-
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  12 ---
 .../protocol/v1_0/ErrantLinkEndpoint.java       |  14 +++
 .../qpid/server/protocol/v1_0/LinkEndpoint.java |   6 ++
 .../v1_0/StandardReceivingLinkEndpoint.java     |  71 ++++++++++++-
 .../tests/protocol/v1_0/FrameTransport.java     |  14 ++-
 .../apache/qpid/tests/protocol/v1_0/Utils.java  |  37 +++++++
 .../protocol/v1_0/messaging/TransferTest.java   | 102 ++++++++++++++++++-
 .../protocol/v1_0/transport/link/FlowTest.java  |  33 +-----
 9 files changed, 239 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 87dc899..00f8724 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -343,7 +343,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         detach(error, true);
     }
 
-    private void detach(Error error, boolean close)
+    protected void detach(Error error, boolean close)
     {
         //TODO
         switch (_state)
@@ -474,11 +474,13 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         return _link;
     }
 
+    @Override
     public SenderSettleMode getSendingSettlementMode()
     {
         return _sendingSettlementMode;
     }
 
+    @Override
     public ReceiverSettleMode getReceivingSettlementMode()
     {
         return _receivingSettlementMode;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 22c8b3f..4b55186 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -54,7 +54,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     {
 
         UnsignedInteger _deliveryId;
-        int _credit = 1;
         boolean _settled;
 
         private TransientState(final UnsignedInteger transferId)
@@ -62,16 +61,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             _deliveryId = transferId;
         }
 
-        void incrementCredit()
-        {
-            _credit++;
-        }
-
-        public int getCredit()
-        {
-            return _credit;
-        }
-
         public UnsignedInteger getDeliveryId()
         {
             return _deliveryId;
@@ -134,7 +123,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             else
             {
                 transientState = _unsettledIds.get(deliveryTag);
-                transientState.incrementCredit();
                 if (delivery.isSettled())
                 {
                     transientState.setSettled(true);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index d7a7667..0d9daa7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -28,7 +28,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
 public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T>
 {
@@ -124,6 +126,18 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
     }
 
     @Override
+    public SenderSettleMode getSendingSettlementMode()
+    {
+        return null;
+    }
+
+    @Override
+    public ReceiverSettleMode getReceivingSettlementMode()
+    {
+        return null;
+    }
+
+    @Override
     public void remoteDetached(final Detach detach)
     {
         // ignore

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index da8c0cb..61a0199 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -28,7 +28,9 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 
 public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
 {
@@ -67,4 +69,8 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
     void destroy();
 
     void close(Error error);
+
+    SenderSettleMode getSendingSettlementMode();
+
+    ReceiverSettleMode getReceivingSettlementMode();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 7d837d5..0282813 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -93,6 +93,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         setCreditWindow();
     }
 
+
     private TerminusDurability getDurability()
     {
         return getTarget().getDurable();
@@ -106,6 +107,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
         final Binary deliveryTag = xfr.getDeliveryTag();
         UnsignedInteger messageFormat = null;
+        ReceiverSettleMode transferReceiverSettleMode = null;
+        Error error = null;
         if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
         {
             _incompleteMessage = new ArrayList<>();
@@ -124,12 +127,26 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
             fragments = new ArrayList<>(_incompleteMessage.size());
 
-            for(Transfer t : _incompleteMessage)
+            for (Transfer t : _incompleteMessage)
             {
-                if(t.getMessageFormat() != null && messageFormat == null)
+                if (t.getMessageFormat() != null && messageFormat == null)
                 {
                     messageFormat = t.getMessageFormat();
                 }
+
+                if (t.getRcvSettleMode() != null)
+                {
+                    if (transferReceiverSettleMode == null)
+                    {
+                        transferReceiverSettleMode = t.getRcvSettleMode();
+                    }
+                    else if (!transferReceiverSettleMode.equals(t.getRcvSettleMode()))
+                    {
+                        error = new Error(AmqpError.INVALID_FIELD,
+                                          "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
+                        break;
+                    }
+                }
                 fragments.addAll(t.getPayload());
                 t.dispose();
             }
@@ -142,16 +159,33 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             _messageDeliveryTag = deliveryTag;
             fragments = xfr.getPayload();
             messageFormat = xfr.getMessageFormat();
-
+            transferReceiverSettleMode = xfr.getRcvSettleMode();
             xfr.dispose();
         }
 
+        if (error == null && !ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+            && ReceiverSettleMode.SECOND.equals(transferReceiverSettleMode))
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                              "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+
+        }
+
+        if (error != null)
+        {
+            for (QpidByteBuffer fragment : fragments)
+            {
+                fragment.dispose();
+            }
+            return error;
+        }
+
         if(_resumedMessage)
         {
             if(_unsettledMap.containsKey(_messageDeliveryTag))
             {
                 Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
-                boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode());
+                boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
                 updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
                 if(settled)
                 {
@@ -279,7 +313,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     }
 
 
-                    boolean settled = ReceiverSettleMode.FIRST.equals(getReceivingSettlementMode()                                                             );
+                    boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
 
                     if (!settled)
                     {
@@ -325,6 +359,16 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         return null;
     }
 
+    private boolean shouldReceiverSettleFirst(ReceiverSettleMode transferReceiverSettleMode)
+    {
+        if (transferReceiverSettleMode == null)
+        {
+            transferReceiverSettleMode = getReceivingSettlementMode();
+        }
+
+        return transferReceiverSettleMode == null || ReceiverSettleMode.FIRST.equals(transferReceiverSettleMode);
+    }
+
     @Override
     protected void remoteDetachedPerformDetach(Detach detach)
     {
@@ -599,4 +643,21 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
         attachReceived(attach);
     }
+
+    @Override
+    protected void detach(Error error, boolean close)
+    {
+        super.detach(error, close);
+
+        if (_incompleteMessage != null)
+        {
+            for (Transfer t : _incompleteMessage)
+            {
+                t.dispose();
+            }
+            _incompleteMessage = null;
+        }
+        _messageDeliveryTag = null;
+        _resumedMessage = false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 14b28a6..47872f0 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -232,11 +232,16 @@ public class FrameTransport implements AutoCloseable
 
     public void doAttachReceivingLink(String queueName) throws Exception
     {
+        doAttachReceivingLink(UnsignedInteger.ZERO, queueName);
+    }
+
+    public void doAttachReceivingLink(final UnsignedInteger handle, String queueName) throws Exception
+    {
         doBeginSession();
         Role localRole = Role.RECEIVER;
         Attach attach = new Attach();
         attach.setName("testReceivingLink");
-        attach.setHandle(new UnsignedInteger(0));
+        attach.setHandle(handle);
         attach.setRole(localRole);
         Source source = new Source();
         source.setAddress(queueName);
@@ -256,7 +261,6 @@ public class FrameTransport implements AutoCloseable
     public void doAttachSendingLink(final UnsignedInteger handle,
                                     final String destination) throws Exception
     {
-        doBeginSession();
         Attach attach = new Attach();
         attach.setName("testSendingLink");
         attach.setHandle(handle);
@@ -267,6 +271,12 @@ public class FrameTransport implements AutoCloseable
         Target target = new Target();
         target.setAddress(destination);
         attach.setTarget(target);
+        doAttachSendingLink(attach);
+    }
+
+    public void doAttachSendingLink(final Attach attach) throws Exception
+    {
+        doBeginSession();
 
         sendPerformative(attach);
         PerformativeResponse response = (PerformativeResponse) getNextResponse();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 5ecc9e1..a9491e3 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -27,12 +27,16 @@ import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
 
+import org.hamcrest.core.Is;
+
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class Utils
 {
@@ -78,4 +82,37 @@ public class Utils
             return queueExists;
         }
     }
+
+    public static Object receiveMessage(final InetSocketAddress brokerAddress,
+                                        final String queueName) throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(brokerAddress))
+        {
+            transport.doAttachReceivingLink(queueName);
+            Flow flow = new Flow();
+            flow.setIncomingWindow(UnsignedInteger.ONE);
+            flow.setNextIncomingId(UnsignedInteger.ZERO);
+            flow.setOutgoingWindow(UnsignedInteger.ZERO);
+            flow.setNextOutgoingId(UnsignedInteger.ZERO);
+            flow.setHandle(UnsignedInteger.ZERO);
+            flow.setLinkCredit(UnsignedInteger.ONE);
+
+            transport.sendPerformative(flow);
+
+            MessageDecoder messageDecoder = new MessageDecoder();
+            boolean hasMore;
+            do
+            {
+                PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+                assertThat(response, Is.is(notNullValue()));
+                assertThat(response.getFrameBody(), Is.is(instanceOf(Transfer.class)));
+                Transfer responseTransfer = (Transfer) response.getFrameBody();
+                messageDecoder.addTransfer(responseTransfer);
+                hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+            }
+            while (hasMore);
+
+            return messageDecoder.getData();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index f3d939f..3e57371 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -37,6 +37,7 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
@@ -46,9 +47,12 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
@@ -93,21 +97,23 @@ public class TransferTest extends ProtocolTestBase
     }
 
     @Test
-    @Ignore("QPID-7749")
     @SpecificationTest(section = "2.6.12",
             description = "Transferring A Message.")
-    public void transfer() throws Exception
+    public void transferUnsettled() throws Exception
     {
+        String sentData = "foo";
         try (FrameTransport transport = new FrameTransport(_brokerAddress))
         {
             final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
             transport.doAttachSendingLink(linkHandle, BrokerAdmin.TEST_QUEUE_NAME);
 
             MessageEncoder messageEncoder = new MessageEncoder();
-            messageEncoder.addData("foo");
+            messageEncoder.addData(sentData);
 
             Transfer transfer = new Transfer();
             transfer.setHandle(linkHandle);
+            transfer.setDeliveryId(UnsignedInteger.ZERO);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
             transfer.setPayload(messageEncoder.getPayload());
 
             transport.sendPerformative(transfer);
@@ -119,8 +125,96 @@ public class TransferTest extends ProtocolTestBase
             assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
             assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
             assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+        }
+    }
 
-            transport.assertNoMoreResponses();
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "If first, this indicates that the receiver MUST settle the delivery once it has arrived without waiting for the sender to settle first")
+    public void transferReceiverSettleModeFirst() throws Exception
+    {
+        String sentData = "foo";
+        try (FrameTransport transport = new FrameTransport(_brokerAddress))
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            Attach attach = new Attach();
+            attach.setName("testSendingLink");
+            attach.setHandle(linkHandle);
+            attach.setRole(Role.SENDER);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            attach.setRcvSettleMode(ReceiverSettleMode.SECOND);
+            Source source = new Source();
+            attach.setSource(source);
+            Target target = new Target();
+            target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+            attach.setTarget(target);
+
+            transport.doAttachSendingLink(attach);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            messageEncoder.addData(sentData);
+
+            Transfer transfer = new Transfer();
+            transfer.setHandle(linkHandle);
+            transfer.setDeliveryId(UnsignedInteger.ZERO);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+            transfer.setPayload(messageEncoder.getPayload());
+            transfer.setRcvSettleMode(ReceiverSettleMode.FIRST);
+
+            transport.sendPerformative(transfer);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Disposition.class)));
+            Disposition responseDisposition = (Disposition) response.getFrameBody();
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(Accepted.class)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "If the negotiated link value is first, then it is illegal to set this field to second.")
+    public void transferReceiverSettleModeCannotBeSecondWhenLinkModeIsFirst() throws Exception
+    {
+        String sentData = "foo";
+        try (FrameTransport transport = new FrameTransport(_brokerAddress))
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ZERO;
+            Attach attach = new Attach();
+            attach.setName("testSendingLink");
+            attach.setHandle(linkHandle);
+            attach.setRole(Role.SENDER);
+            attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+            attach.setRcvSettleMode(ReceiverSettleMode.FIRST);
+            Source source = new Source();
+            attach.setSource(source);
+            Target target = new Target();
+            target.setAddress(BrokerAdmin.TEST_QUEUE_NAME);
+            attach.setTarget(target);
+
+            transport.doAttachSendingLink(attach);
+
+            MessageEncoder messageEncoder = new MessageEncoder();
+            messageEncoder.addData(sentData);
+
+            Transfer transfer = new Transfer();
+            transfer.setHandle(linkHandle);
+            transfer.setDeliveryId(UnsignedInteger.ZERO);
+            transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
+            transfer.setPayload(messageEncoder.getPayload());
+            transfer.setRcvSettleMode(ReceiverSettleMode.SECOND);
+
+            transport.sendPerformative(transfer);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Detach.class)));
+            Detach detach = (Detach) response.getFrameBody();
+            Error error = detach.getError();
+            assertThat(error, is(notNullValue()));
+            assertThat(error.getCondition(), is(equalTo(AmqpError.INVALID_FIELD)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/aa1f6ff0/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index 722432a..9b4aa9c 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -36,13 +36,12 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
-import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.PerformativeResponse;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class FlowTest extends ProtocolTestBase
 {
@@ -138,35 +137,9 @@ public class FlowTest extends ProtocolTestBase
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
         getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, "foo");
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
-        try (FrameTransport transport = new FrameTransport(addr))
-        {
-            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
-            Flow flow = new Flow();
-            flow.setIncomingWindow(UnsignedInteger.ONE);
-            flow.setNextIncomingId(UnsignedInteger.ZERO);
-            flow.setOutgoingWindow(UnsignedInteger.ZERO);
-            flow.setNextOutgoingId(UnsignedInteger.ZERO);
-            flow.setHandle(UnsignedInteger.ZERO); // TODO
-            flow.setLinkCredit(UnsignedInteger.ONE);
 
-            transport.sendPerformative(flow);
-
-            MessageDecoder messageDecoder = new MessageDecoder();
-            boolean hasMore;
-            do
-            {
-                PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
-                assertThat(response, is(notNullValue()));
-                assertThat(response.getFrameBody(), is(instanceOf(Transfer.class)));
-                Transfer responseTransfer = (Transfer) response.getFrameBody();
-                messageDecoder.addTransfer(responseTransfer);
-                hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
-            }
-            while (hasMore);
-
-            String data = (String) messageDecoder.getData();
-            assertThat(data, is(equalTo("foo")));
-        }
+        String data = (String) Utils.receiveMessage(addr, BrokerAdmin.TEST_QUEUE_NAME);
+        assertThat(data, is(equalTo("foo")));
     }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org