You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/28 15:47:57 UTC

[1/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor transfer functionality

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master d604344c2 -> 2e8efc0a9


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
new file mode 100644
index 0000000..7168c47
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -0,0 +1,499 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.transaction;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+
+public class TransactionalTransferTest extends ProtocolTestBase
+{
+    private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
+    private InetSocketAddress _brokerAddress;
+
+    @Before
+    public void setUp()
+    {
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.4",
+            description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+                          + " transfer with a transaction, it MUST set the state of the transfer with a"
+                          + "transactional-state carrying the appropriate transaction identifier.")
+    public void sendTransactionalPostingReceiverSettlesFirst() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         .consumeResponse(Open.class)
+                                                         .begin()
+                                                         .consumeResponse(Begin.class)
+
+                                                         .txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         .attachRole(Role.SENDER)
+                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         .attachHandle(linkHandle)
+                                                         .attach().consumeResponse(Attach.class)
+                                                         .consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            interaction.txnDischarge(txnState, false);
+
+            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.4",
+            description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+                          + " transfer with a transaction, it MUST set the state of the transfer with a"
+                          + "transactional-state carrying the appropriate transaction identifier.")
+    public void sendTransactionalPostingDischargeFail() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         .consumeResponse(Open.class)
+                                                         .begin()
+                                                         .consumeResponse(Begin.class)
+
+                                                         .txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         .attachRole(Role.SENDER)
+                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         .attachHandle(linkHandle)
+                                                         .attach().consumeResponse(Attach.class)
+                                                         .consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            interaction.txnDischarge(txnState, true);
+
+            assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.4",
+            description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+                          + " transfer with a transaction, it MUST set the state of the transfer with a"
+                          + "transactional-state carrying the appropriate transaction identifier.")
+    public void sendTransactionalPostingReceiverSettlesSecond() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            Disposition responseDisposition = interaction.negotiateProtocol()
+                                                         .consumeResponse()
+                                                         .open()
+                                                         .consumeResponse(Open.class)
+                                                         .begin()
+                                                         .consumeResponse(Begin.class)
+
+                                                         .txnAttachCoordinatorLink(txnState)
+                                                         .txnDeclare(txnState)
+
+                                                         .attachRole(Role.SENDER)
+                                                         .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                         .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                         .attachHandle(linkHandle)
+                                                         .attach().consumeResponse(Attach.class)
+                                                         .consumeResponse(Flow.class)
+
+                                                         .transferHandle(linkHandle)
+                                                         .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                         .transferTransactionalState(txnState.getCurrentTransactionId())
+                                                         .transfer()
+                                                         .consumeResponse(Disposition.class)
+                                                         .getLatestResponse(Disposition.class);
+
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+            assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            interaction.dispositionRole(Role.SENDER)
+                       .dispositionSettled(true)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition();
+
+            interaction.txnDischarge(txnState, false);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+                                                        + "wish to associate the outcome of a delivery with a transaction.")
+    public void receiveTransactionalRetirementReceiverSettleFirst() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .txnDischarge(txnState, false);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+                                                        + "wish to associate the outcome of a delivery with a transaction.")
+    public void receiveTransactionalRetirementDischargeFail() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .txnDischarge(txnState, true);
+
+            Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+            assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+        }
+    }
+
+    @Ignore("TODO disposition is currently not being sent by Broker")
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+                                                        + "wish to associate the outcome of a delivery with a transaction.")
+    public void receiveTransactionalRetirementReceiverSettleSecond() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+
+                       .receiveDelivery()
+                       .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Disposition settledDisposition = interaction.dispositionSettled(false)
+                                                    .dispositionRole(Role.RECEIVER)
+                                                    .dispositionTransactionalState(txnState.getCurrentTransactionId(),
+                                                                                   new Accepted())
+                                                    .disposition()
+                                                    .consumeResponse(Disposition.class)
+                                                    .getLatestResponse(Disposition.class);
+
+            assertThat(settledDisposition.getSettled(), is(true));
+            assertThat(settledDisposition.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+            interaction.txnDischarge(txnState, false);
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Acquisition[...]In the case of the flow frame,"
+                                                        + " the transactional work is not necessarily directly"
+                                                        + " initiated or entirely determined when the flow frame"
+                                                        + " arrives at the resource, but can in fact occur at some "
+                                                        + " later point and in ways not necessarily"
+                                                        + " anticipated by the controller.")
+    public void receiveTransactionalAcquisitionReceiverSettleFirst() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+                       .flow()
+
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers.size(), is(equalTo(1)));
+            Transfer transfer = transfers.get(0);
+            assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .txnDischarge(txnState, false);
+
+            assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "4.4.2", description = "Transactional Acquisition[...]In the case of the flow frame,"
+                                                        + " the transactional work is not necessarily directly"
+                                                        + " initiated or entirely determined when the flow frame"
+                                                        + " arrives at the resource, but can in fact occur at some "
+                                                        + " later point and in ways not necessarily"
+                                                        + " anticipated by the controller.")
+    public void receiveTransactionalAcquisitionDischargeFail() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .open()
+                       .consumeResponse(Open.class)
+                       .begin()
+                       .consumeResponse(Begin.class)
+
+                       .txnAttachCoordinatorLink(txnState)
+                       .txnDeclare(txnState)
+
+                       .attachRole(Role.RECEIVER)
+                       .attachHandle(UnsignedInteger.ONE)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                       .attach()
+                       .consumeResponse(Attach.class)
+
+                       .flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowNextIncomingId(UnsignedInteger.ZERO)
+                       .flowOutgoingWindow(UnsignedInteger.ZERO)
+                       .flowNextOutgoingId(UnsignedInteger.ZERO)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+                       .flow()
+
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers.size(), is(equalTo(1)));
+            Transfer transfer = transfers.get(0);
+            assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+            assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+                       .disposition()
+                       .txnDischarge(txnState, true);
+
+            assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+            assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor transfer functionality

Posted by or...@apache.org.
QPID-7842 : [AMQP 1.0] Refactor transfer functionality


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2e8efc0a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2e8efc0a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2e8efc0a

Branch: refs/heads/master
Commit: 2e8efc0a9cdc25bdd52589866f13f3e1b99bb8f0
Parents: d604344
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jun 22 15:25:01 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 28 16:14:05 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  62 ++-
 .../v1_0/AbstractReceivingLinkEndpoint.java     | 256 ++++++----
 .../protocol/v1_0/ConsumerTarget_1_0.java       |   6 +-
 .../qpid/server/protocol/v1_0/Delivery.java     | 144 +++++-
 .../protocol/v1_0/ErrantLinkEndpoint.java       |  12 +-
 .../qpid/server/protocol/v1_0/LinkEndpoint.java |   3 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      | 197 ++++----
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 155 +++---
 .../v1_0/StandardReceivingLinkEndpoint.java     | 162 +-----
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  60 +--
 .../protocol/v1_0/codec/ValueHandler.java       |   2 +-
 .../v1_0/delivery/DeliveryRegistry.java         |  34 ++
 .../v1_0/delivery/DeliveryRegistryImpl.java     |  79 +++
 .../v1_0/delivery/UnsettledDelivery.java        |  46 ++
 .../qpid/server/protocol/v1_0/type/Outcome.java |   2 +-
 .../protocol/v1_0/type/transport/Attach.java    |  12 +-
 .../protocol/v1_0/type/transport/Begin.java     |   6 +-
 .../v1_0/type/transport/Disposition.java        |   7 +
 .../protocol/v1_0/type/transport/Flow.java      |   7 +-
 .../qpid/tests/protocol/v1_0/BrokerAdmin.java   |   4 +
 .../v1_0/EmbeddedBrokerPerClassAdminImpl.java   |  14 +
 .../v1_0/ExternalQpidBrokerAdminImpl.java       |  12 +
 .../qpid/tests/protocol/v1_0/Interaction.java   | 261 +++++++++-
 .../v1_0/InteractionTransactionalState.java     |  50 ++
 .../apache/qpid/tests/protocol/v1_0/Utils.java  |  17 +-
 .../v1_0/messaging/MultiTransferTest.java       | 413 +++++++++++++++
 .../protocol/v1_0/messaging/TransferTest.java   | 345 ++++++++++++-
 .../v1_0/transaction/DischargeTest.java         |   2 +-
 .../transaction/TransactionalTransferTest.java  | 499 +++++++++++++++++++
 29 files changed, 2272 insertions(+), 597 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 00f8724..a02a297 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -52,24 +51,22 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
     private final Link_1_0<S, T> _link;
     private final Session_1_0 _session;
+
+    // todo: remove client specific part
     private Object _flowTransactionId;
-    private SenderSettleMode _sendingSettlementMode;
-    private ReceiverSettleMode _receivingSettlementMode;
-    private Map _initialUnsettledMap;
-    private UnsignedInteger _lastSentCreditLimit;
+    private volatile SenderSettleMode _sendingSettlementMode;
+    private volatile ReceiverSettleMode _receivingSettlementMode;
+    private volatile UnsignedInteger _lastSentCreditLimit;
     private volatile boolean _stopped;
     private volatile boolean _stoppedUpdated;
-    private Symbol[] _capabilities;
-    private SequenceNumber _deliveryCount;
-    private UnsignedInteger _linkCredit;
-    private UnsignedInteger _available;
-    private Boolean _drain;
-    private UnsignedInteger _localHandle;
-    private UnsignedLong _maxMessageSize;
-    private Map<Symbol, Object> _properties;
-
-    protected volatile State _state = State.ATTACH_RECVD;
-    protected Map _localUnsettled;
+    private volatile Symbol[] _capabilities;
+    private volatile SequenceNumber _deliveryCount;
+    private volatile UnsignedInteger _linkCredit;
+    private volatile UnsignedInteger _available;
+    private volatile Boolean _drain;
+    private volatile UnsignedInteger _localHandle;
+    private volatile Map<Symbol, Object> _properties;
+    private volatile State _state = State.ATTACH_RECVD;
 
     protected enum State
     {
@@ -88,12 +85,13 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         _link = link;
     }
 
-    protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+    protected abstract void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
 
     protected abstract void remoteDetachedPerformDetach(final Detach detach);
 
     protected abstract Map<Symbol,Object> initProperties(final Attach attach);
 
+    protected abstract Map<Binary, DeliveryState> getLocalUnsettled();
 
     @Override
     public void receiveAttach(final Attach attach) throws AmqpErrorException
@@ -131,9 +129,17 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     {
         _sendingSettlementMode = attach.getSndSettleMode();
         _receivingSettlementMode = attach.getRcvSettleMode();
-        _initialUnsettledMap = attach.getUnsettled();
         _properties = initProperties(attach);
         _state = State.ATTACH_RECVD;
+
+        if (getRole() == Role.RECEIVER)
+        {
+            getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+        }
+        else
+        {
+            getSession().getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+        }
     }
 
     public boolean isStopped()
@@ -229,20 +235,16 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         }
     }
 
-    public void addUnsettled(final Delivery unsettled)
-    {
-    }
-
     @Override
-    public void receiveDeliveryState(final Delivery unsettled,
+    public void receiveDeliveryState(final Binary deliveryTag,
                                      final DeliveryState state,
                                      final Boolean settled)
     {
-        handle(unsettled.getDeliveryTag(), state, settled);
+        handleDeliveryState(deliveryTag, state, settled);
 
         if (Boolean.TRUE.equals(settled))
         {
-            settle(unsettled.getDeliveryTag());
+            settle(deliveryTag);
         }
     }
 
@@ -297,7 +299,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         attachToSend.setTarget(getTarget());
         attachToSend.setSndSettleMode(getSendingSettlementMode());
         attachToSend.setRcvSettleMode(getReceivingSettlementMode());
-        attachToSend.setUnsettled(_localUnsettled);
+        attachToSend.setUnsettled(getLocalUnsettled());
         attachToSend.setProperties(_properties);
         attachToSend.setOfferedCapabilities(_capabilities);
 
@@ -496,13 +498,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         _capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
     }
 
-    public Map getInitialUnsettledMap()
-    {
-        return _initialUnsettledMap;
-    }
-
-    public abstract void initialiseUnsettled();
-
     @Override public String toString()
     {
         return "LinkEndpoint{" +
@@ -517,7 +512,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
                ", _available=" + _available +
                ", _drain=" + _drain +
                ", _localHandle=" + _localHandle +
-               ", _maxMessageSize=" + _maxMessageSize +
                '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 4b55186..d159de5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -35,48 +36,21 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extends AbstractLinkEndpoint<Source, T>
 {
     private final SectionDecoder _sectionDecoder;
-    private UnsignedInteger _lastDeliveryId;
-    private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>();
-    private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>();
-    private boolean _creditWindow;
-
-
-    private static class TransientState
-    {
-
-        UnsignedInteger _deliveryId;
-        boolean _settled;
-
-        private TransientState(final UnsignedInteger transferId)
-        {
-            _deliveryId = transferId;
-        }
-
-        public UnsignedInteger getDeliveryId()
-        {
-            return _deliveryId;
-        }
-
-        public boolean isSettled()
-        {
-            return _settled;
-        }
-
-        public void setSettled(boolean settled)
-        {
-            _settled = settled;
-        }
-    }
+    final Map<Binary, DeliveryState> _unsettled = Collections.synchronizedMap(new LinkedHashMap<>());
 
+    private volatile boolean _creditWindow;
+    private volatile Delivery _currentDelivery;
 
     public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, T> link)
     {
@@ -98,73 +72,162 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         return Role.RECEIVER;
     }
 
-    Error receiveTransfer(final Transfer transfer, final Delivery delivery)
+    void receiveTransfer(final Transfer transfer)
     {
         if(isAttached())
         {
-            TransientState transientState;
-            final Binary deliveryTag = delivery.getDeliveryTag();
-            boolean existingState = _unsettledMap.containsKey(deliveryTag);
-            if (!existingState || transfer.getState() != null)
+            if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+                && ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
             {
-                _unsettledMap.put(deliveryTag, transfer.getState());
+                Error error = new Error(AmqpError.INVALID_FIELD,
+                                  "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+                close(error);
+                return;
             }
-            if (!existingState)
+
+            if (_currentDelivery == null)
             {
-                transientState = new TransientState(transfer.getDeliveryId());
-                if (delivery.isSettled())
+                Error error = validateNewTransfer(transfer);
+                if (error != null)
                 {
-                    transientState.setSettled(true);
+                    close(error);
+                    return;
                 }
-                _unsettledIds.put(deliveryTag, transientState);
+                _currentDelivery = new Delivery(transfer, this);
+
                 setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
                 getDeliveryCount().incr();
+
+                getSession().getIncomingDeliveryRegistry()
+                            .addDelivery(transfer.getDeliveryId(),
+                                         new UnsettledDelivery(transfer.getDeliveryTag(), this));
             }
             else
             {
-                transientState = _unsettledIds.get(deliveryTag);
-                if (delivery.isSettled())
+                Error error = validateSubsequentTransfer(transfer);
+                if (error != null)
                 {
-                    transientState.setSettled(true);
+                    close(error);
+                    return;
                 }
+                _currentDelivery.addTransfer(transfer);
+            }
+
+            if (!_currentDelivery.getResume())
+            {
+                _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
+            }
+            else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
+            {
+                final Error error = new Error(AmqpError.ILLEGAL_STATE,
+                                              String.format("Resumed transfer with delivery tag '%s' is not found.",
+                                                            _currentDelivery.getDeliveryTag()));
+                close(error);
+                return;
             }
 
-            if (transientState.isSettled() && delivery.isComplete())
+            if (_currentDelivery.isAborted())
             {
-                _unsettledMap.remove(deliveryTag);
+                _unsettled.remove(_currentDelivery.getDeliveryTag());
+                getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+                _currentDelivery = null;
+
+                setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+                getDeliveryCount().decr();
+            }
+            else if (_currentDelivery.isComplete())
+            {
+                try
+                {
+                    if (_currentDelivery.isSettled())
+                    {
+                        _unsettled.remove(_currentDelivery.getDeliveryTag());
+                        getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+                    }
+                    Error error = receiveDelivery(_currentDelivery);
+                    if (error != null)
+                    {
+                        close(error);
+                    }
+                }
+                finally
+                {
+                    _currentDelivery = null;
+                }
             }
-            return messageTransfer(transfer);
         }
         else
         {
+            // TODO: it is wrong
             getSession().updateDisposition(Role.RECEIVER, transfer.getDeliveryId(), transfer.getDeliveryId(),null, true);
-            return null;
         }
     }
 
-    protected abstract Error messageTransfer(final Transfer transfer);
-
-    @Override public void receiveFlow(final Flow flow)
+    private Error validateNewTransfer(final Transfer transfer)
     {
-        setAvailable(flow.getAvailable());
-        setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+        Error error = null;
+        if (transfer.getDeliveryId() == null)
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                                    "Transfer \"delivery-id\" is required for a new delivery.");
+        }
+        else if (transfer.getDeliveryTag() == null)
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                                    "Transfer \"delivery-tag\" is required for a new delivery.");
+        }
+        return error;
     }
 
-    public boolean settled(final Binary deliveryTag)
+    private Error validateSubsequentTransfer(final Transfer transfer)
     {
-        boolean deleted;
-        if (deleted = (_unsettledIds.remove(deliveryTag) != null))
+        Error error = null;
+        if (transfer.getDeliveryId() != null && !_currentDelivery.getDeliveryId()
+                                                                 .equals(transfer.getDeliveryId()))
         {
-            _unsettledMap.remove(deliveryTag);
-
+            error = new Error(AmqpError.INVALID_FIELD,
+                              String.format(
+                                      "Unexpected transfer \"delivery-id\" for multi-transfer delivery: found '%s', expected '%s'.",
+                                      transfer.getDeliveryId(),
+                                      _currentDelivery.getDeliveryId()));
         }
+        else if (transfer.getDeliveryTag() != null && !_currentDelivery.getDeliveryTag()
+                                                                       .equals(transfer.getDeliveryTag()))
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                              String.format(
+                                      "Unexpected transfer \"delivery-tag\" for multi-transfer delivery: found '%s', expected '%s'.",
+                                      transfer.getDeliveryTag(),
+                                      _currentDelivery.getDeliveryTag()));
+        }
+        else if (_currentDelivery.getReceiverSettleMode() != null && transfer.getRcvSettleMode() != null
+                 && !_currentDelivery.getReceiverSettleMode().equals(transfer.getRcvSettleMode()))
+        {
+            error = new Error(AmqpError.INVALID_FIELD,
+                              "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
+        }
+        return error;
+    }
+
+    protected abstract Error receiveDelivery(final Delivery delivery);
 
-        return deleted;
+    @Override
+    public void receiveFlow(final Flow flow)
+    {
+        setAvailable(flow.getAvailable());
+        setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+    }
+
+    private boolean settled(final Binary deliveryTag)
+    {
+        return _unsettled.remove(deliveryTag) != null;
     }
 
-    public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+    public void updateDisposition(final Binary deliveryTag,
+                                  final DeliveryState state,
+                                  final boolean settled)
     {
-        if (_unsettledMap.containsKey(deliveryTag))
+        if (_unsettled.containsKey(deliveryTag))
         {
             boolean outcomeUpdate = false;
             Outcome outcome = null;
@@ -174,27 +237,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             }
             else if (state instanceof TransactionalState)
             {
-                // TODO? Is this correct
                 outcome = ((TransactionalState) state).getOutcome();
             }
 
             if (outcome != null)
             {
-                Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
-                outcomeUpdate = !outcome.equals(oldOutcome);
+                if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+                {
+                    Object oldOutcome = _unsettled.put(deliveryTag, outcome);
+                    outcomeUpdate = !outcome.equals(oldOutcome);
+                }
             }
 
-
-            TransientState transientState = _unsettledIds.get(deliveryTag);
             if (outcomeUpdate || settled)
             {
-
-                final UnsignedInteger transferId = transientState.getDeliveryId();
-
-                getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
+                getSession().updateDisposition(getRole(), deliveryTag, state, settled);
             }
 
-
             if (settled)
             {
 
@@ -212,33 +271,28 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
                 }
             }
         }
-        else
+        else if (_creditWindow)
         {
-            TransientState transientState = _unsettledIds.get(deliveryTag);
-            if (_creditWindow)
-            {
-                setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
-                sendFlowConditional();
-            }
-
+            setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+            sendFlowConditional();
         }
     }
 
-    public void setCreditWindow()
+    void setCreditWindow()
     {
         setCreditWindow(true);
     }
 
-    public void setCreditWindow(boolean window)
+    private void setCreditWindow(boolean window)
     {
         _creditWindow = window;
         sendFlowConditional();
     }
 
     @Override
-    public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+    public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
-        super.receiveDeliveryState(unsettled, state, settled);
+        super.receiveDeliveryState(deliveryTag, state, settled);
         if(_creditWindow)
         {
             if(Boolean.TRUE.equals(settled))
@@ -258,8 +312,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     public void settle(Binary deliveryTag)
     {
         super.settle(deliveryTag);
-        _unsettledIds.remove(deliveryTag);
-        _unsettledMap.remove(deliveryTag);
+        _unsettled.remove(deliveryTag);
         if(_creditWindow)
         {
              sendFlowConditional();
@@ -271,15 +324,32 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     {
     }
 
-    UnsignedInteger getLastDeliveryId()
+    @Override
+    protected void detach(final Error error, final boolean close)
     {
-        return _lastDeliveryId;
+        try
+        {
+            super.detach(error, close);
+        }
+        finally
+        {
+            if (close)
+            {
+                if (_currentDelivery != null)
+                {
+                    _currentDelivery.discard();
+                    _currentDelivery = null;
+                }
+            }
+        }
     }
 
-    void setLastDeliveryId(UnsignedInteger lastDeliveryId)
+    @Override
+    protected void handleDeliveryState(Binary deliveryTag, DeliveryState state, Boolean settled)
     {
-        _lastDeliveryId = lastDeliveryId;
+        if(Boolean.TRUE.equals(settled))
+        {
+            _unsettled.remove(deliveryTag);
+        }
     }
-
-
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index a535fd2..3942ca8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -79,7 +79,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 
     private Binary _transactionId;
     private final AMQPDescribedTypeRegistry _typeRegistry;
-    private SendingLinkEndpoint _linkEndpoint;
+    private final SendingLinkEndpoint _linkEndpoint;
     private final SectionEncoder _sectionEncoder;
 
     private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
@@ -356,7 +356,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
     {
         updateNotifyWorkDesired();
 
-        if (isSuspended() && getEndpoint() != null)
+        if (_linkEndpoint != null)
         {
             _transactionId = _linkEndpoint.getTransactionId();
         }
@@ -454,7 +454,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
                             }
                             else
                             {
-                                _linkEndpoint.updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+                                _linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
                             }
                             _linkEndpoint.sendFlowConditional();
                         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index a5fb4bc..8facf8f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -19,38 +19,57 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class Delivery
 {
     private final UnsignedInteger _deliveryId;
     private final Binary _deliveryTag;
+    private final List<Transfer> _transfers = new CopyOnWriteArrayList<>();
     private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> _linkEndpoint;
-    private boolean _complete;
-    private boolean _settled;
-    private int _numberOfTransfers = 0;
+    private final UnsignedInteger _messageFormat;
+    private volatile boolean _complete;
+    private volatile boolean _settled;
+    private volatile boolean _aborted;
+    private volatile DeliveryState _state;
+    private volatile ReceiverSettleMode _receiverSettleMode;
+    private volatile boolean _resume;
 
     public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
     {
-        _settled = Boolean.TRUE.equals(transfer.getSettled());
         _deliveryId = transfer.getDeliveryId();
         _deliveryTag = transfer.getDeliveryTag();
         _linkEndpoint = endpoint;
+        _messageFormat = transfer.getMessageFormat();
         addTransfer(transfer);
     }
 
-    public boolean isComplete()
+    public UnsignedInteger getDeliveryId()
     {
-        return _complete;
+        return _deliveryId;
+    }
+
+    public Binary getDeliveryTag()
+    {
+        return _deliveryTag;
     }
 
-    public void setComplete(final boolean complete)
+    public boolean isComplete()
     {
-        _complete = complete;
+        return _complete;
     }
 
     public boolean isSettled()
@@ -58,27 +77,93 @@ public class Delivery
         return _settled;
     }
 
-    public void setSettled(final boolean settled)
+    public boolean isAborted()
     {
-        _settled = settled;
+        return _aborted;
     }
 
-    public final void addTransfer(Transfer transfer)
+    public DeliveryState getState()
     {
-        _numberOfTransfers++;
-        if(Boolean.TRUE.equals(transfer.getAborted()) || !Boolean.TRUE.equals(transfer.getMore()))
+        return _state;
+    }
+
+    public ReceiverSettleMode getReceiverSettleMode()
+    {
+        return _receiverSettleMode;
+    }
+
+    public UnsignedInteger getMessageFormat()
+    {
+        return _messageFormat;
+    }
+
+
+    public boolean getResume()
+    {
+        return _resume;
+    }
+
+    final void addTransfer(Transfer transfer)
+    {
+        if (_aborted)
+        {
+            throw new IllegalStateException(String.format("Delivery '%s/%d' is already aborted",
+                                                          _deliveryTag,
+                                                          _deliveryId.intValue()));
+        }
+
+        if (_complete)
+        {
+            throw new IllegalStateException(String.format("Delivery '%s/%d' is already completed",
+                                                          _deliveryTag,
+                                                          _deliveryId.intValue()));
+        }
+
+        _transfers.add(transfer);
+        if (Boolean.TRUE.equals(transfer.getAborted()))
         {
-            setComplete(true);
+            _aborted = true;
+            discard();
+        }
+        if(!Boolean.TRUE.equals(transfer.getMore()))
+        {
+            _complete = true;
         }
         if(Boolean.TRUE.equals(transfer.getSettled()))
         {
-            setSettled(true);
+            _settled = true;
         }
-    }
 
-    public UnsignedInteger getDeliveryId()
-    {
-        return _deliveryId;
+        if(Boolean.TRUE.equals(transfer.getResume()))
+        {
+            _resume = true;
+        }
+
+        if (transfer.getState() != null)
+        {
+            DeliveryState currentState;
+            if (_state instanceof TransactionalState)
+            {
+                currentState = ((TransactionalState) _state).getOutcome();
+            }
+            else
+            {
+                currentState = _state;
+            }
+            if (!(currentState instanceof Outcome))
+            {
+                _state = transfer.getState();
+            }
+        }
+
+        if (transfer.getRcvSettleMode() != null)
+        {
+            if (_receiverSettleMode == null)
+            {
+                _receiverSettleMode = transfer.getRcvSettleMode();
+            }
+
+        }
     }
 
     public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> getLinkEndpoint()
@@ -86,13 +171,26 @@ public class Delivery
         return _linkEndpoint;
     }
 
-    public Binary getDeliveryTag()
+
+    public List<QpidByteBuffer> getPayload()
     {
-        return _deliveryTag;
+        List<QpidByteBuffer> fragments = new ArrayList<>(_transfers.size());
+        for (Transfer t : _transfers)
+        {
+            fragments.addAll(t.getPayload());
+            t.dispose();
+        }
+        _transfers.clear();
+        return fragments;
     }
 
-    public int getNumberOfTransfers()
+    public void discard()
     {
-        return _numberOfTransfers;
+        for (Transfer transfer: _transfers)
+        {
+            transfer.dispose();
+        }
+        _transfers.clear();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index 0d9daa7..287f038 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -144,6 +145,12 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
     }
 
     @Override
+    public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    {
+
+    }
+
+    @Override
     public void receiveFlow(final Flow flow)
     {
         throw new UnsupportedOperationException("This Link is errant");
@@ -173,9 +180,4 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
         throw new UnsupportedOperationException("This Link is errant");
     }
 
-    @Override
-    public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
-    {
-        throw new UnsupportedOperationException("This Link is errant");
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index 61a0199..94e3b1e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -52,7 +53,7 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
 
     void remoteDetached(Detach detach);
 
-    void receiveDeliveryState(Delivery unsettled,
+    void receiveDeliveryState(Binary deliveryTag,
                               DeliveryState state,
                               Boolean settled);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 78b6290..95dc34b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +53,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
@@ -80,23 +78,20 @@ import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
+    private static final Symbol PRIORITY = Symbol.valueOf("priority");
 
-    public static final Symbol PRIORITY = Symbol.valueOf("priority");
-    private UnsignedInteger _lastDeliveryId;
-    private Binary _lastDeliveryTag;
-    private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
-    private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
-    private Binary _transactionId;
-    private Integer _priority;
     private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
     private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+    private final Map<Binary, OutgoingDelivery> _unsettled = new ConcurrentHashMap<>();
+
+    private volatile Binary _transactionId;
+    private volatile Integer _priority;
     private volatile boolean _draining = false;
-    private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
-    private SendingDestination _destination;
-    private EnumSet<ConsumerOption> _consumerOptions;
-    private FilterManager _consumerFilters;
-    private ConsumerTarget_1_0 _consumerTarget;
-    private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
+    private volatile SendingDestination _destination;
+    private volatile EnumSet<ConsumerOption> _consumerOptions;
+    private volatile FilterManager _consumerFilters;
+    private volatile ConsumerTarget_1_0 _consumerTarget;
+    private volatile MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
 
     public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
     {
@@ -111,7 +106,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
     }
 
-    public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
+    private void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
     {
         // TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
         _destination = destination;
@@ -199,7 +194,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         _consumerFilters = filters;
     }
 
-    void createConsumerTarget() throws AmqpErrorException
+    private void createConsumerTarget() throws AmqpErrorException
     {
         final Source source = getSource();
         _consumerTarget = new ConsumerTarget_1_0(this,
@@ -338,7 +333,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         }
 
         attachReceived(attach);
-        initialiseUnsettled();
     }
 
     @Override
@@ -372,17 +366,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         return Role.SENDER;
     }
 
-    public Integer getPriority()
+    private Integer getPriority()
     {
         return _priority;
     }
 
-    public TerminusDurability getTerminusDurability()
-    {
-        return getSource().getDurable();
-    }
-
-    public boolean transfer(final Transfer xfr, final boolean decrementCredit)
+    void transfer(final Transfer xfr, final boolean decrementCredit)
     {
         Session_1_0 s = getSession();
         xfr.setMessageFormat(UnsignedInteger.ZERO);
@@ -395,27 +384,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
 
         xfr.setHandle(getLocalHandle());
 
-        s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
-
-        if(!Boolean.TRUE.equals(xfr.getSettled()))
-        {
-            _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
-        }
-
-        if(Boolean.TRUE.equals(xfr.getMore()))
-        {
-            _lastDeliveryTag = xfr.getDeliveryTag();
-        }
-        else
-        {
-            _lastDeliveryTag = null;
-        }
-
-        return true;
+        s.sendTransfer(xfr, this, true);
     }
 
 
-    public boolean drained()
+    boolean drained()
     {
         if (_draining)
         {
@@ -514,11 +487,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
             Modified state = new Modified();
             state.setDeliveryFailed(true);
 
-            for (UnsettledAction action : _unsettledActionMap.values())
+            for (OutgoingDelivery delivery : _unsettled.values())
             {
-                action.process(state, Boolean.TRUE);
+                UnsettledAction action = delivery.getAction();
+                if (action != null)
+                {
+                    action.process(state, Boolean.TRUE);
+                    delivery.setAction(null);
+                }
             }
-            _unsettledActionMap.clear();
 
             Error closingError = null;
             if (getDestination() instanceof ExchangeDestination
@@ -558,20 +535,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         }
     }
 
-    public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
+    void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance messageInstance)
     {
-        _unsettledActionMap.put(tag, unsettledAction);
-        if(getTransactionId() == null)
-        {
-            _unsettledMap2.put(tag, queueEntry);
-        }
-
+        _unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
     }
 
     @Override
-    protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+    protected void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
     {
-        UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+        UnsettledAction action = _unsettled.get(deliveryTag).getAction();
         boolean localSettle = false;
         if(action != null)
         {
@@ -583,9 +555,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         }
         if(Boolean.TRUE.equals(settled) || localSettle)
         {
-            _unsettledActionMap.remove(deliveryTag);
-            _unsettledMap.remove(deliveryTag);
-            _unsettledMap2.remove(deliveryTag);
+            _unsettled.remove(deliveryTag);
         }
     }
 
@@ -602,23 +572,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                && getSession().hasCreditToSend();
     }
 
-    public UnsignedInteger getLastDeliveryId()
-    {
-        return _lastDeliveryId;
-    }
-
-    public void setLastDeliveryId(final UnsignedInteger deliveryId)
-    {
-        _lastDeliveryId = deliveryId;
-    }
-
     public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
     {
-        UnsignedInteger deliveryId;
-        if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
+        if (settled && (_unsettled.remove(deliveryTag) != null))
         {
-            _unsettledMap2.remove(deliveryTag);
-            getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+            getSession().updateDisposition(getRole(), deliveryTag, state, settled);
         }
     }
 
@@ -677,33 +635,34 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         getLink().setTarget(target);
 
 
-        final MessageInstanceConsumer consumer = getConsumer();
+        final MessageInstanceConsumer oldConsumer = getConsumer();
         createConsumerTarget();
         _resumeAcceptedTransfers.clear();
         _resumeFullTransfers.clear();
         final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
-        Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
-        Map initialUnsettledMap = getInitialUnsettledMap();
+        Map<Binary, OutgoingDelivery> unsettledCopy = new HashMap<>(_unsettled);
+        Map<Binary, DeliveryState> remoteUnsettled =
+                attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
 
-        for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+        for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
-            final MessageInstance queueEntry = entry.getValue();
-            if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+            final MessageInstance queueEntry = entry.getValue().getMessageInstance();
+            if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
             {
                 queueEntry.setRedelivered();
-                queueEntry.release(consumer);
-                _unsettledMap2.remove(deliveryTag);
+                queueEntry.release(oldConsumer);
+                _unsettled.remove(deliveryTag);
             }
-            else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+            else if (remoteUnsettled.get(deliveryTag) instanceof Outcome)
             {
-                Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+                Outcome outcome = (Outcome) remoteUnsettled.get(deliveryTag);
 
                 if (outcome instanceof Accepted)
                 {
-                    AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
-                    if (consumer.acquires())
+                    if (oldConsumer.acquires())
                     {
+                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
                         if (queueEntry.acquire() || queueEntry.isAcquired())
                         {
                             txn.dequeue(Collections.singleton(queueEntry),
@@ -723,15 +682,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                 }
                 else if (outcome instanceof Released)
                 {
-                    AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
-                    if (consumer.acquires())
+                    if (oldConsumer.acquires())
                     {
+                        AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
                         txn.dequeue(Collections.singleton(queueEntry),
                                     new ServerTransaction.Action()
                                     {
                                         public void postCommit()
                                         {
-                                            queueEntry.release(consumer);
+                                            queueEntry.release(oldConsumer);
                                         }
 
                                         public void onRollback()
@@ -740,14 +699,17 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
                                     });
                     }
                 }
-                //_unsettledMap.remove(deliveryTag);
-                initialUnsettledMap.remove(deliveryTag);
+
+                // TODO: Handle rejected and modified outcome
+
+                remoteUnsettled.remove(deliveryTag);
                 _resumeAcceptedTransfers.add(deliveryTag);
             }
             else
             {
                 _resumeFullTransfers.add(queueEntry);
-                // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+
+                // TODO: exists in receivers map, but not yet got an outcome ... should resend with resume = true
             }
         }
 
@@ -755,22 +717,22 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     }
 
     @Override
-    public void initialiseUnsettled()
+    protected Map<Binary, DeliveryState> getLocalUnsettled()
     {
-        Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
-
-        for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
+        Map<Binary, DeliveryState> unsettled = new HashMap<>();
+        for (Map.Entry<Binary, OutgoingDelivery> entry : _unsettled.entrySet())
         {
-            entry.setValue(null);
+            unsettled.put(entry.getKey(), entry.getValue().getLocalState());
         }
+        return unsettled;
     }
 
-    public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
+    private MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
     {
         return _consumer;
     }
 
-    public ConsumerTarget_1_0 getConsumerTarget()
+    ConsumerTarget_1_0 getConsumerTarget()
     {
         return _consumerTarget;
     }
@@ -784,4 +746,45 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         _destination = destination;
     }
+
+    private static class OutgoingDelivery
+    {
+        private final MessageInstance _messageInstance;
+        private volatile UnsettledAction _action;
+        private volatile DeliveryState _localState;
+
+        public OutgoingDelivery(final MessageInstance messageInstance,
+                                final UnsettledAction action,
+                                final DeliveryState localState)
+        {
+            _messageInstance = messageInstance;
+            _action = action;
+            _localState = localState;
+        }
+
+        public MessageInstance getMessageInstance()
+        {
+            return _messageInstance;
+        }
+
+        public UnsettledAction getAction()
+        {
+            return _action;
+        }
+
+        public DeliveryState getLocalState()
+        {
+            return _localState;
+        }
+
+        public void setLocalState(final DeliveryState localState)
+        {
+            _localState = localState;
+        }
+
+        public void setAction(final UnsettledAction action)
+        {
+            _action = action;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 29f1a01..65a5265 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -35,7 +35,6 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -72,6 +71,9 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistryImpl;
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
 import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -155,9 +157,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
     private UnsignedInteger _lastSentIncomingLimit;
 
-    private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
-    private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
-
+    private final DeliveryRegistry _outgoingDeliveryRegistry = new DeliveryRegistryImpl();
+    private final DeliveryRegistry _incomingDeliveryRegistry = new DeliveryRegistryImpl();
 
     private final Error _sessionEndedLinkError =
             new Error(LinkError.DETACH_FORCED,
@@ -231,7 +232,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         }
     }
 
-    public void updateDisposition(final Role role,
+    void updateDisposition(final Role role,
                                   final UnsignedInteger first,
                                   final UnsignedInteger last,
                                   final DeliveryState state, final boolean settled)
@@ -248,13 +249,13 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
         if (settled)
         {
-            final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
-                    role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
+            final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+
             SequenceNumber pos = new SequenceNumber(first.intValue());
             SequenceNumber end = new SequenceNumber(last.intValue());
             while (pos.compareTo(end) <= 0)
             {
-                unsettled.remove(UnsignedInteger.valueOf(pos.intValue()));
+                deliveryRegistry.removeDelivery(UnsignedInteger.valueOf(pos.intValue()));
                 pos.incr();
             }
         }
@@ -263,6 +264,21 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         //TODO - check send flow
     }
 
+    void updateDisposition(final Role role,
+                           final Binary deliveryTag,
+                           final DeliveryState state,
+                           final boolean settled)
+    {
+        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+        UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+        if (deliveryId == null)
+        {
+            throw new ConnectionScopedRuntimeException(String.format(
+                    "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
+        }
+        updateDisposition(role, deliveryId, deliveryId, state, settled);
+    }
+
     public boolean hasCreditToSend()
     {
         boolean b = _remoteIncomingWindow > 0;
@@ -283,32 +299,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         if (newDelivery)
         {
             deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
-            endpoint.setLastDeliveryId(deliveryId);
+            xfr.setDeliveryId(deliveryId);
             if (!settled)
             {
-                final Delivery delivery = new Delivery(xfr, endpoint);
-                _outgoingUnsettled.put(deliveryId, delivery);
-                endpoint.addUnsettled(delivery);
-            }
-        }
-        else
-        {
-            deliveryId = endpoint.getLastDeliveryId();
-            final Delivery delivery = _outgoingUnsettled.get(deliveryId);
-            if (delivery != null)
-            {
-                if (!settled)
-                {
-                    delivery.addTransfer(xfr);
-                }
-                else
-                {
-                    endpoint.settle(delivery.getDeliveryTag());
-                    _outgoingUnsettled.remove(deliveryId);
-                }
+                final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
+                _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
             }
         }
-        xfr.setDeliveryId(deliveryId);
+
         _remoteIncomingWindow--;
         try
         {
@@ -461,42 +459,40 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     {
         Role dispositionRole = disposition.getRole();
 
-        LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+        DeliveryRegistry unsettledDeliveries;
 
         if(dispositionRole == Role.RECEIVER)
         {
-            unsettledTransfers = _outgoingUnsettled;
+            unsettledDeliveries = _outgoingDeliveryRegistry;
         }
         else
         {
-            unsettledTransfers = _incomingUnsettled;
-
+            unsettledDeliveries = _incomingDeliveryRegistry;
         }
 
         SequenceNumber deliveryId = new SequenceNumber(disposition.getFirst().intValue());
         SequenceNumber last;
         if(disposition.getLast() == null)
         {
-            last = deliveryId;
+            last = new SequenceNumber(deliveryId.intValue());
         }
         else
         {
             last = new SequenceNumber(disposition.getLast().intValue());
         }
 
-
         while(deliveryId.compareTo(last)<=0)
         {
             UnsignedInteger deliveryIdUnsigned = UnsignedInteger.valueOf(deliveryId.intValue());
-            Delivery delivery = unsettledTransfers.get(deliveryIdUnsigned);
-            if(delivery != null)
+            UnsettledDelivery unsettledDelivery = unsettledDeliveries.getDelivery(deliveryIdUnsigned);
+
+            if(unsettledDelivery != null)
             {
-                delivery.getLinkEndpoint().receiveDeliveryState(delivery,
-                                                                disposition.getState(),
-                                                                disposition.getSettled());
+                LinkEndpoint<?,?> linkEndpoint  = unsettledDelivery.getLinkEndpoint();
+                linkEndpoint.receiveDeliveryState(unsettledDelivery.getDeliveryTag(), disposition.getState(), disposition.getSettled());
                 if (Boolean.TRUE.equals(disposition.getSettled()))
                 {
-                    unsettledTransfers.remove(deliveryIdUnsigned);
+                    unsettledDeliveries.removeDelivery(deliveryIdUnsigned);
                 }
             }
             deliveryId.incr();
@@ -607,62 +603,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         else
         {
             AbstractReceivingLinkEndpoint endpoint = ((AbstractReceivingLinkEndpoint) linkEndpoint);
-
-            UnsignedInteger deliveryId = transfer.getDeliveryId();
-            if (deliveryId == null)
-            {
-                deliveryId = endpoint.getLastDeliveryId();
-            }
-
-            Delivery delivery = _incomingUnsettled.get(deliveryId);
-            if (delivery == null)
-            {
-                delivery = new Delivery(transfer, endpoint);
-                _incomingUnsettled.put(deliveryId, delivery);
-
-                if (Boolean.TRUE.equals(transfer.getMore()))
-                {
-                    endpoint.setLastDeliveryId(transfer.getDeliveryId());
-                }
-            }
-            else
-            {
-                if (delivery.getDeliveryId().equals(deliveryId))
-                {
-                    delivery.addTransfer(transfer);
-
-                    if (!Boolean.TRUE.equals(transfer.getMore()))
-                    {
-                        endpoint.setLastDeliveryId(null);
-                    }
-                }
-                else
-                {
-                    End reply = new End();
-
-                    Error error = new Error();
-                    error.setCondition(AmqpError.ILLEGAL_STATE);
-                    error.setDescription("TRANSFER called on Session for link handle "
-                                         + inputHandle
-                                         + " with incorrect delivery id "
-                                         + transfer.getDeliveryId());
-                    reply.setError(error);
-                    _connection.sendEnd(_sendingChannel, reply, true);
-
-                    return;
-
-                }
-            }
-
-            Error error = endpoint.receiveTransfer(transfer, delivery);
-            if(error != null)
-            {
-                endpoint.close(error);
-            }
-            if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
-            {
-                _incomingUnsettled.remove(deliveryId);
-            }
+            endpoint.receiveTransfer(transfer);
         }
     }
 
@@ -1536,6 +1477,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         }
         _endpointToOutputHandle.remove(linkEndpoint);
         _associatedLinkEndpoints.remove(linkEndpoint);
+        if (linkEndpoint.getRole() == Role.RECEIVER)
+        {
+            getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+        }
+        else
+        {
+            getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+        }
     }
 
     private void detach(UnsignedInteger handle, Detach detach)
@@ -1614,6 +1563,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         return primaryDomain;
     }
 
+    DeliveryRegistry getOutgoingDeliveryRegistry()
+    {
+        return _outgoingDeliveryRegistry;
+    }
+
+    DeliveryRegistry getIncomingDeliveryRegistry()
+    {
+        return _incomingDeliveryRegistry;
+    }
+
     private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
     {
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 96dbdc1..048fa20 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -53,20 +52,14 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.txn.AutoCommitTransaction;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 
 public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
 
-    private ArrayList<Transfer> _incompleteMessage;
-    private boolean _resumedMessage;
-    private Binary _messageDeliveryTag;
-    private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
     private ReceivingDestination _receivingDestination;
 
     public StandardReceivingLinkEndpoint(final Session_1_0 session,
@@ -89,107 +82,30 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
     }
 
     @Override
-    protected Error messageTransfer(Transfer xfr)
+    protected Error receiveDelivery(Delivery delivery)
     {
-        List<QpidByteBuffer> fragments = null;
-
-        org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
-        final Binary deliveryTag = xfr.getDeliveryTag();
-        UnsignedInteger messageFormat = null;
-        ReceiverSettleMode transferReceiverSettleMode = null;
-        Error error = null;
-        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
-        {
-            _incompleteMessage = new ArrayList<>();
-            _incompleteMessage.add(xfr);
-            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = deliveryTag;
-            return null;
-        }
-        else if(_incompleteMessage != null)
-        {
-            _incompleteMessage.add(xfr);
-            if(Boolean.TRUE.equals(xfr.getMore()))
-            {
-                return null;
-            }
-
-            fragments = new ArrayList<>(_incompleteMessage.size());
-
-            for (Transfer t : _incompleteMessage)
-            {
-                if (t.getMessageFormat() != null && messageFormat == null)
-                {
-                    messageFormat = t.getMessageFormat();
-                }
-
-                if (t.getRcvSettleMode() != null)
-                {
-                    if (transferReceiverSettleMode == null)
-                    {
-                        transferReceiverSettleMode = t.getRcvSettleMode();
-                    }
-                    else if (!transferReceiverSettleMode.equals(t.getRcvSettleMode()))
-                    {
-                        error = new Error(AmqpError.INVALID_FIELD,
-                                          "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
-                        break;
-                    }
-                }
-                fragments.addAll(t.getPayload());
-                t.dispose();
-            }
-            _incompleteMessage=null;
-
-        }
-        else
-        {
-            _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
-            _messageDeliveryTag = deliveryTag;
-            fragments = xfr.getPayload();
-            messageFormat = xfr.getMessageFormat();
-            transferReceiverSettleMode = xfr.getRcvSettleMode();
-            xfr.dispose();
-        }
-
-        if (error == null && !ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
-            && ReceiverSettleMode.SECOND.equals(transferReceiverSettleMode))
-        {
-            error = new Error(AmqpError.INVALID_FIELD,
-                              "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
-
-        }
-
-        if (error != null)
-        {
-            for (QpidByteBuffer fragment : fragments)
-            {
-                fragment.dispose();
-            }
-            return error;
-        }
+        ReceiverSettleMode transferReceiverSettleMode = delivery.getReceiverSettleMode();
 
-        if(_resumedMessage)
+        if(delivery.getResume())
         {
-            if(_unsettledMap.containsKey(_messageDeliveryTag))
+            DeliveryState deliveryState = _unsettled.get(delivery.getDeliveryTag());
+            if (deliveryState instanceof Outcome)
             {
-                Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
                 boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
-                updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
-                if(settled)
-                {
-                    _unsettledMap.remove(_messageDeliveryTag);
-                }
+                updateDisposition(delivery.getDeliveryTag(), deliveryState, settled);
+                return null;
             }
             else
             {
-                throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
+                // TODO: create message ?
             }
         }
         else
         {
             ServerMessage<?> serverMessage;
-
+            UnsignedInteger messageFormat = delivery.getMessageFormat();
+            org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = delivery.getState();
+            List<QpidByteBuffer> fragments = delivery.getPayload();
             MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
             if(format != null)
             {
@@ -279,7 +195,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                     {
                         if (transactionId == null)
                         {
-                            resultantState = (DeliveryState) outcome;
+                            resultantState = outcome;
                         }
                         else
                         {
@@ -302,12 +218,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
                     boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
 
-                    if (!settled)
-                    {
-                        _unsettledMap.put(deliveryTag, outcome);
-                    }
-
-                    updateDisposition(deliveryTag, resultantState, settled);
+                    updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
 
                     getSession().getAMQPConnection()
                                 .registerMessageReceived(serverMessage.getSize(), arrivalTime);
@@ -319,12 +230,12 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
                         {
                             public void postCommit()
                             {
-                                updateDisposition(deliveryTag, null, true);
+                                updateDisposition(delivery.getDeliveryTag(), null, true);
                             }
 
                             public void onRollback()
                             {
-                                updateDisposition(deliveryTag, null, true);
+                                updateDisposition(delivery.getDeliveryTag(), null, true);
                             }
                         });
                     }
@@ -375,16 +286,13 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         }
     }
 
-
     @Override
-    protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+    protected Map<Binary, DeliveryState> getLocalUnsettled()
     {
-        if(Boolean.TRUE.equals(settled))
-        {
-            _unsettledMap.remove(deliveryTag);
-        }
+        return new HashMap<>(_unsettled);
     }
 
+
     @Override
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
@@ -433,26 +341,20 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         setCapabilities(targetCapabilities);
         setDestination(destination);
 
-        Map initialUnsettledMap = getInitialUnsettledMap();
-        Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
-        for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+        Map remoteUnsettled = attach.getUnsettled();
+        Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+        for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
-            if(!initialUnsettledMap.containsKey(deliveryTag))
+            if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
             {
-                _unsettledMap.remove(deliveryTag);
+                _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
             }
         }
 
         getLink().setTermini(source, target);
     }
 
-    @Override
-    public void initialiseUnsettled()
-    {
-        _localUnsettled = new HashMap(_unsettledMap);
-    }
-
     public ReceivingDestination getReceivingDestination()
     {
         return _receivingDestination;
@@ -515,7 +417,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         }
 
         attachReceived(attach);
-        initialiseUnsettled();
     }
 
     @Override
@@ -528,21 +429,4 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
         attachReceived(attach);
     }
-
-    @Override
-    protected void detach(Error error, boolean close)
-    {
-        super.detach(error, close);
-
-        if (_incompleteMessage != null)
-        {
-            for (Transfer t : _incompleteMessage)
-            {
-                t.dispose();
-            }
-            _incompleteMessage = null;
-        }
-        _messageDeliveryTag = null;
-        _resumedMessage = false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 25dd8bd..108943d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -19,8 +19,8 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +44,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.server.txn.LocalTransaction;
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -52,7 +51,6 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
 {
     private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
-    private ArrayList<Transfer> _incompleteMessage;
 
     public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
     {
@@ -67,43 +65,11 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     }
 
     @Override
-    protected Error messageTransfer(Transfer xfr)
+    protected Error receiveDelivery(Delivery delivery)
     {
-        List<QpidByteBuffer> payload = new ArrayList<>();
+        List<QpidByteBuffer> payload = delivery.getPayload();
 
-        final Binary deliveryTag = xfr.getDeliveryTag();
 
-        if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
-        {
-            _incompleteMessage = new ArrayList<Transfer>();
-            _incompleteMessage.add(xfr);
-            return null;
-        }
-        else if(_incompleteMessage != null)
-        {
-            _incompleteMessage.add(xfr);
-            if(Boolean.TRUE.equals(xfr.getMore()))
-            {
-                return null;
-            }
-
-            for(Transfer t : _incompleteMessage)
-            {
-                final List<QpidByteBuffer> bufs = t.getPayload();
-                if(bufs != null)
-                {
-                    payload.addAll(bufs);
-                }
-                t.dispose();
-            }
-            _incompleteMessage=null;
-
-        }
-        else
-        {
-            payload.addAll(xfr.getPayload());
-            xfr.dispose();
-        }
 
         // Only interested in the amqp-value section that holds the message to the coordinator
         try
@@ -132,7 +98,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
                         session.incrementStartedTransactions();
 
                         state.setTxnId(session.integerToBinary(txn.getId()));
-                        updateDisposition(deliveryTag, state, true);
+                        updateDisposition(delivery.getDeliveryTag(), state, true);
 
                     }
                     else if(command instanceof Discharge)
@@ -159,7 +125,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
 
                         if (error == null)
                         {
-                            updateDisposition(deliveryTag, outcome, true);
+                            updateDisposition(delivery.getDeliveryTag(), outcome, true);
                         }
                         return error;
                     }
@@ -250,6 +216,12 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     }
 
     @Override
+    protected Map<Binary, DeliveryState> getLocalUnsettled()
+    {
+        return Collections.emptyMap();
+    }
+
+    @Override
     protected void reattachLink(final Attach attach) throws AmqpErrorException
     {
         throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
@@ -283,20 +255,10 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     }
 
     @Override
-    protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
-    {
-
-    }
-
-    @Override
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
         super.attachReceived(attach);
         setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
     }
 
-    @Override
-    public void initialiseUnsettled()
-    {
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 7a47703..55588a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -131,7 +131,7 @@ public class ValueHandler implements DescribedTypeConstructorRegistry.Source
                 {
                     position--;
                 }
-                originalPositions[i] = position;
+                originalPositions[i - firstBufferWithAvailable] = position;
             }
 
             Object descriptor = parse(in);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
new file mode 100644
index 0000000..8a054fd
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface DeliveryRegistry
+{
+    void addDelivery(UnsignedInteger deliveryId, UnsettledDelivery unsettledDelivery);
+    void removeDelivery(UnsignedInteger deliveryId);
+    UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
+    void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
+    UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
new file mode 100644
index 0000000..466164f
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class DeliveryRegistryImpl implements DeliveryRegistry
+{
+    private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
+    private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+
+    @Override
+    public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
+    {
+        _deliveries.put(deliveryId, unsettledDelivery);
+        _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+    }
+
+    @Override
+    public void removeDelivery(final UnsignedInteger deliveryId)
+    {
+        UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
+        if (unsettledDelivery != null)
+        {
+            _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+        }
+    }
+
+    @Override
+    public UnsettledDelivery getDelivery(final UnsignedInteger deliveryId)
+    {
+        return _deliveries.get(deliveryId);
+    }
+
+    @Override
+    public void removeDeliveriesForLinkEndpoint(final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        Iterator<UnsettledDelivery> iterator = _deliveries.values().iterator();
+        while (iterator.hasNext())
+        {
+            UnsettledDelivery unsettledDelivery = iterator.next();
+            if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
+            {
+                iterator.remove();
+                _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+            }
+        }
+    }
+
+    @Override
+    public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+    {
+        return _deliveryIds.get(deliveryTag);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor transfer functionality

Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
new file mode 100644
index 0000000..48d2ab1
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+
+public class UnsettledDelivery
+{
+    private final Binary _deliveryTag;
+    private final LinkEndpoint<?,?> _linkEndpoint;
+
+    public UnsettledDelivery(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
+    {
+        _deliveryTag = deliveryTag;
+        _linkEndpoint = linkEndpoint;
+    }
+
+    public Binary getDeliveryTag()
+    {
+        return _deliveryTag;
+    }
+
+    public LinkEndpoint<?, ?> getLinkEndpoint()
+    {
+        return _linkEndpoint;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
index 968ce4e..ee66d7b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
@@ -21,7 +21,7 @@
 
 package org.apache.qpid.server.protocol.v1_0.type;
 
-public interface Outcome
+public interface Outcome extends DeliveryState
 {
     Symbol getSymbol();
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
index 3bae54d..e8b404a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
@@ -30,6 +30,8 @@ import java.util.Map;
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -62,7 +64,7 @@ public class Attach implements FrameBody
     private BaseTarget _target;
 
     @CompositeTypeField
-    private Map _unsettled;
+    private Map<Binary, DeliveryState> _unsettled;
 
     @CompositeTypeField
     private Boolean _incompleteUnsettled;
@@ -80,7 +82,7 @@ public class Attach implements FrameBody
     private Symbol[] _desiredCapabilities;
 
     @CompositeTypeField
-    private Map _properties;
+    private Map<Symbol, Object> _properties;
 
     public String getName()
     {
@@ -152,7 +154,7 @@ public class Attach implements FrameBody
         _target = target;
     }
 
-    public Map getUnsettled()
+    public Map<Binary, DeliveryState> getUnsettled()
     {
         return _unsettled;
     }
@@ -212,12 +214,12 @@ public class Attach implements FrameBody
         _desiredCapabilities = desiredCapabilities;
     }
 
-    public Map getProperties()
+    public Map<Symbol, Object> getProperties()
     {
         return _properties;
     }
 
-    public void setProperties(Map properties)
+    public void setProperties(Map<Symbol, Object> properties)
     {
         _properties = properties;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
index 0e0489a..001b5fe 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
@@ -60,7 +60,7 @@ public class Begin implements FrameBody
     private Symbol[] _desiredCapabilities;
 
     @CompositeTypeField
-    private Map _properties;
+    private Map<Symbol, Object> _properties;
 
     public UnsignedShort getRemoteChannel()
     {
@@ -132,12 +132,12 @@ public class Begin implements FrameBody
         _desiredCapabilities = desiredCapabilities;
     }
 
-    public Map getProperties()
+    public Map<Symbol, Object> getProperties()
     {
         return _properties;
     }
 
-    public void setProperties(Map properties)
+    public void setProperties(Map<Symbol, Object> properties)
     {
         _properties = properties;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
index 1b8722a..8b13cf3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
@@ -26,6 +26,7 @@ package org.apache.qpid.server.protocol.v1_0.type.transport;
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -34,16 +35,22 @@ public class Disposition implements FrameBody
 {
     private ByteBuffer _payload;
 
+    @CompositeTypeField(mandatory = true)
     private Role _role;
 
+    @CompositeTypeField(mandatory = true)
     private UnsignedInteger _first;
 
+    @CompositeTypeField
     private UnsignedInteger _last;
 
+    @CompositeTypeField
     private Boolean _settled;
 
+    @CompositeTypeField
     private DeliveryState _state;
 
+    @CompositeTypeField
     private Boolean _batchable;
 
     public Role getRole()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
index 63b567e..8c87c7b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 public class Flow implements FrameBody
@@ -66,7 +67,7 @@ public class Flow implements FrameBody
     private Boolean _echo;
 
     @CompositeTypeField
-    private Map _properties;
+    private Map<Symbol, Object> _properties;
 
     public UnsignedInteger getNextIncomingId()
     {
@@ -168,12 +169,12 @@ public class Flow implements FrameBody
         _echo = echo;
     }
 
-    public Map getProperties()
+    public Map<Symbol, Object> getProperties()
     {
         return _properties;
     }
 
-    public void setProperties(Map properties)
+    public void setProperties(Map<Symbol, Object> properties)
     {
         _properties = properties;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
index 261543f..303cc28 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
@@ -41,6 +41,7 @@ public interface BrokerAdmin extends Pluggable
     void createQueue(String queueName);
     void deleteQueue(String queueName);
     void putMessageOnQueue(String queueName, String... messages);
+    int getQueueDepthMessages(String testQueueName);
 
     boolean supportsRestart();
     ListenableFuture<Void> restart();
@@ -48,10 +49,13 @@ public interface BrokerAdmin extends Pluggable
     boolean isSASLSupported();
     boolean isSASLMechanismSupported(String mechanismName);
     boolean isWebSocketSupported();
+    boolean isQueueDepthSupported();
 
     String getValidUsername();
     String getValidPassword();
 
+
+
     enum PortType
     {
         ANONYMOUS_AMQP,

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
index 2680246..e4dd859 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.SystemLauncher;
 import org.apache.qpid.server.SystemLauncherListener;
 import org.apache.qpid.server.logging.logback.LogbackLoggingSystemLauncherListener;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.Container;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
@@ -306,6 +307,13 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public int getQueueDepthMessages(final String testQueueName)
+    {
+        Queue queue = _currentVirtualHostNode.getVirtualHost().getChildByName(Queue.class, testQueueName);
+        return queue.getQueueDepthMessages();
+    }
+
+    @Override
     public boolean supportsRestart()
     {
         return _isPersistentStore;
@@ -348,6 +356,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public boolean isQueueDepthSupported()
+    {
+        return true;
+    }
+
+    @Override
     public String getValidUsername()
     {
         return "guest";

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
index 045df9b..a580333 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
@@ -95,6 +95,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public int getQueueDepthMessages(final String testQueueName)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
     public boolean supportsRestart()
     {
         return false;
@@ -119,6 +125,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
     }
 
     @Override
+    public boolean isQueueDepthSupported()
+    {
+        return false;
+    }
+
+    @Override
     public boolean isSASLMechanismSupported(final String mechanismName)
     {
         return true;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index eac8770..9da8a8e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -22,8 +22,14 @@ package org.apache.qpid.tests.protocol.v1_0;
 
 import static com.google.common.util.concurrent.Futures.allAsList;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 
 import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -38,27 +44,39 @@ import java.util.concurrent.TimeoutException;
 import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
 import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class Interaction
@@ -71,6 +89,7 @@ public class Interaction
     private final Detach _detach;
     private final Flow _flow;
     private final Transfer _transfer;
+    private final Disposition _disposition;
     private final FrameTransport _transport;
     private final SaslInit _saslInit;
     private final SaslResponse _saslResponse;
@@ -79,6 +98,9 @@ public class Interaction
     private UnsignedShort _sessionChannel;
     private Response<?> _latestResponse;
     private ListenableFuture<?> _latestFuture;
+    private int _deliveryIdCounter;
+    private List<Transfer> _latestDelivery;
+    private Object _decodedLatestDelivery;
 
     Interaction(final FrameTransport frameTransport)
     {
@@ -120,7 +142,10 @@ public class Interaction
         _transfer = new Transfer();
         _transfer.setHandle(defaultLinkHandle);
         _transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
-        _transfer.setDeliveryId(UnsignedInteger.ZERO);
+        _transfer.setDeliveryId(getNextDeliveryId());
+
+        _disposition = new Disposition();
+        _disposition.setFirst(UnsignedInteger.ZERO);
     }
 
     /////////////////////////
@@ -316,6 +341,12 @@ public class Interaction
         return this;
     }
 
+    public Interaction attachSndSettleMode(final SenderSettleMode senderSettleMode)
+    {
+        _attach.setSndSettleMode(senderSettleMode);
+        return this;
+    }
+
     public Interaction attachSource(Source source)
     {
         _attach.setSource(source);
@@ -344,6 +375,14 @@ public class Interaction
         return this;
     }
 
+    public Interaction attachSourceDefaultOutcome(final Outcome defaultOutcome)
+    {
+        Source source = ((Source) _attach.getSource());
+        source.setDefaultOutcome(defaultOutcome);
+        _attach.setSource(source);
+        return this;
+    }
+
     public Interaction attachTargetAddress(final String address)
     {
         final Target target = ((Target) _attach.getTarget());
@@ -434,6 +473,12 @@ public class Interaction
         return this;
     }
 
+    public Interaction flowProperties(Map<Symbol, Object> properties)
+    {
+        _flow.setProperties(properties);
+        return this;
+    }
+
     public Interaction flow() throws Exception
     {
         sendPerformativeAndChainFuture(_flow, _sessionChannel);
@@ -456,6 +501,19 @@ public class Interaction
         return this;
     }
 
+    public Interaction transferState(final DeliveryState state)
+    {
+        _transfer.setState(state);
+        return this;
+    }
+
+    public Interaction transferTransactionalState(final Binary transactionalId)
+    {
+        TransactionalState transactionalState = new TransactionalState();
+        transactionalState.setTxnId(transactionalId);
+        return transferState(transactionalState);
+    }
+
     public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
     {
         _transfer.setDeliveryId(deliveryId);
@@ -474,13 +532,19 @@ public class Interaction
         return this;
     }
 
+    public Interaction transferAborted(final Boolean aborted)
+    {
+        _transfer.setAborted(aborted);
+        return this;
+    }
+
     public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
     {
         _transfer.setMessageFormat(messageFormat);
         return this;
     }
 
-    public Interaction transferPayload(final List<QpidByteBuffer> payload)
+    public Interaction setPayloadOnTransfer(final List<QpidByteBuffer> payload)
     {
         _transfer.setPayload(payload);
         return this;
@@ -488,17 +552,21 @@ public class Interaction
 
     public Interaction transferPayloadData(final Object payload)
     {
+        setPayloadOnTransfer(_transfer, payload);
+        return this;
+    }
+
+    private void setPayloadOnTransfer(final Transfer transfer, final Object payload)
+    {
         AmqpValue amqpValue = new AmqpValue(payload);
         final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
         final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
-        _transfer.setPayload(encodedForm);
-
+        transfer.setPayload(encodedForm);
         section.dispose();
         for (QpidByteBuffer qbb : encodedForm)
         {
             qbb.dispose();
         }
-        return this;
     }
 
     public Interaction transferSettled(final Boolean settled)
@@ -513,6 +581,126 @@ public class Interaction
         return this;
     }
 
+    /////////////////
+    // disposition //
+    /////////////////
+
+    public Interaction dispositionSettled(final boolean settled)
+    {
+        _disposition.setSettled(settled);
+        return this;
+    }
+
+    public Interaction dispositionState(final DeliveryState state)
+    {
+        _disposition.setState(state);
+        return this;
+    }
+
+
+    public Interaction dispositionTransactionalState(final Binary transactionId, final Outcome outcome)
+    {
+        TransactionalState state = new TransactionalState();
+        state.setTxnId(transactionId);
+        state.setOutcome(outcome);
+        return dispositionState(state);
+    }
+
+    public Interaction dispositionRole(final Role role)
+    {
+        _disposition.setRole(role);
+        return this;
+    }
+
+    public Interaction disposition() throws Exception
+    {
+        sendPerformativeAndChainFuture(_disposition, _sessionChannel);
+        return this;
+    }
+
+    /////////////////
+    // transaction //
+    ////////////////
+
+    public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception
+    {
+        Attach attach = new Attach();
+        attach.setName("testTransactionCoordinator-" + transactionalState.getHandle());
+        attach.setHandle(transactionalState.getHandle());
+        attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+        attach.setTarget(new Coordinator());
+        attach.setRole(Role.SENDER);
+        Source source = new Source();
+        attach.setSource(source);
+        source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+        sendPerformativeAndChainFuture(attach, _sessionChannel);
+        consumeResponse(Attach.class);
+        consumeResponse(Flow.class);
+        return this;
+    }
+
+    public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
+    {
+        Transfer transfer = createTransactionTransfer(txnState.getHandle());
+        setPayloadOnTransfer(transfer, new Declare());
+        sendPerformativeAndChainFuture(transfer, _sessionChannel);
+        consumeResponse(Disposition.class);
+        Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
+        assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+        assertThat(declareTransactionDisposition.getState(), is(instanceOf(Declared.class)));
+        Binary transactionId = ((Declared) declareTransactionDisposition.getState()).getTxnId();
+        assertThat(transactionId, is(notNullValue()));
+        consumeResponse(Flow.class);
+        txnState.setLastTransactionId(transactionId);
+        return this;
+    }
+
+    public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+    {
+        final Discharge discharge = new Discharge();
+        discharge.setTxnId(txnState.getCurrentTransactionId());
+        discharge.setFail(failed);
+
+        Transfer transfer = createTransactionTransfer(txnState.getHandle());
+        setPayloadOnTransfer(transfer, discharge);
+        sendPerformativeAndChainFuture(transfer, _sessionChannel);
+
+        Disposition declareTransactionDisposition = null;
+        Flow coordinatorFlow = null;
+        do
+        {
+            consumeResponse(Disposition.class, Flow.class);
+            Response<?> response = getLatestResponse();
+            if (response.getBody() instanceof Disposition)
+            {
+                declareTransactionDisposition = (Disposition) response.getBody();
+            }
+            if (response.getBody() instanceof Flow)
+            {
+                final Flow flowResponse = (Flow) response.getBody();
+                if (flowResponse.getHandle().equals(txnState.getHandle()))
+                {
+                    coordinatorFlow = flowResponse;
+                }
+            }
+        } while(declareTransactionDisposition == null || coordinatorFlow == null);
+
+        assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+        assertThat(declareTransactionDisposition.getState(), is(instanceOf(Accepted.class)));
+
+        txnState.setLastTransactionId(null);
+        return this;
+    }
+
+    private Transfer createTransactionTransfer(final UnsignedInteger handle)
+    {
+        Transfer transfer = new Transfer();
+        transfer.setHandle(handle);
+        transfer.setDeliveryId(getNextDeliveryId());
+        transfer.setDeliveryTag(new Binary(("transaction-" + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
+        return transfer;
+    }
+
     //////////
     // misc //
     //////////
@@ -567,11 +755,14 @@ public class Interaction
             return this;
         }
         acceptableResponseClasses.remove(null);
-        for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+        if (_latestResponse != null)
         {
-            if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+            for (Class<?> acceptableResponseClass : acceptableResponseClasses)
             {
-                return this;
+                if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+                {
+                    return this;
+                }
             }
         }
         throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
@@ -613,4 +804,58 @@ public class Interaction
         _flow.setHandle(_attach.getHandle());
         return this;
     }
+
+    private UnsignedInteger getNextDeliveryId()
+    {
+        return UnsignedInteger.valueOf(_deliveryIdCounter++);
+    }
+
+    public Interaction receiveDelivery() throws Exception
+    {
+        _latestDelivery = receiveAllTransfers();
+        return this;
+    }
+
+    public Interaction decodeLatestDelivery() throws AmqpErrorException
+    {
+        MessageDecoder messageDecoder = new MessageDecoder();
+        _latestDelivery.forEach(transfer ->
+                                {
+                                    messageDecoder.addTransfer(transfer);
+                                    transfer.dispose();
+                                });
+        _decodedLatestDelivery = messageDecoder.getData();
+        _latestDelivery = null;
+        return this;
+    }
+
+    public List<Transfer> getLatestDelivery()
+    {
+        return _latestDelivery;
+    }
+
+    public Object getDecodedLatestDelivery()
+    {
+        return _decodedLatestDelivery;
+    }
+
+    private List<Transfer> receiveAllTransfers() throws Exception
+    {
+        List<Transfer> transfers = new ArrayList<>();
+        boolean hasMore;
+        do
+        {
+            Transfer responseTransfer = consumeResponse().getLatestResponse(Transfer.class);
+            hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+            transfers.add(responseTransfer);
+        }
+        while (hasMore);
+
+        return transfers;
+    }
+
+    public InteractionTransactionalState createTransactionalState(final UnsignedInteger handle)
+    {
+        return new InteractionTransactionalState(handle);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
new file mode 100644
index 0000000..061be92
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class InteractionTransactionalState
+{
+    private final UnsignedInteger _handle;
+    private Binary _lastTransactionId;
+
+    public InteractionTransactionalState(final UnsignedInteger handle)
+    {
+        _handle = handle;
+    }
+
+    public UnsignedInteger getHandle()
+    {
+        return _handle;
+    }
+
+    public void setLastTransactionId(final Binary lastTransactionId)
+    {
+        _lastTransactionId = lastTransactionId;
+    }
+
+    public Binary getCurrentTransactionId()
+    {
+        return _lastTransactionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index bfe05b8..446c112 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 
 public class Utils
 {
@@ -81,19 +80,11 @@ public class Utils
                                                      .flowNextOutgoingId(UnsignedInteger.ZERO)
                                                      .flowLinkCredit(UnsignedInteger.ONE)
                                                      .flowHandleFromLinkHandle()
-                                                     .flow();
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
 
-            MessageDecoder messageDecoder = new MessageDecoder();
-            boolean hasMore;
-            do
-            {
-                Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
-                messageDecoder.addTransfer(responseTransfer);
-                hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
-            }
-            while (hasMore);
-
-            return messageDecoder.getData();
+            return interaction.getDecodedLatestDelivery();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
new file mode 100644
index 0000000..2f83a13
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.messaging;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class MultiTransferTest extends ProtocolTestBase
+{
+    private InetSocketAddress _brokerAddress;
+    private String _originalMmsMessageStorePersistence;
+
+    @Before
+    public void setUp()
+    {
+        _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+        System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_originalMmsMessageStorePersistence != null)
+        {
+            System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+        }
+        else
+        {
+            System.clearProperty("qpid.tests.mms.messagestore.persistence");
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.14",
+            description = "For messages that are too large to fit within the maximum frame size, additional data MAY"
+                          + " be transferred in additional transfer frames by setting the more flag on all"
+                          + " but the last transfer frame")
+    public void multiTransferMessage() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+
+            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+            Interaction interaction = transport.newInteraction();
+            Disposition disposition = interaction.negotiateProtocol().consumeResponse()
+                                                 .open().consumeResponse(Open.class)
+                                                 .begin().consumeResponse(Begin.class)
+                                                 .attachRole(Role.SENDER)
+                                                 .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                 .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                 .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                                                 .attach().consumeResponse(Attach.class)
+                                                 .consumeResponse(Flow.class)
+                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                                                 .transferDeliveryId(deliveryId)
+                                                 .transferDeliveryTag(deliveryTag)
+                                                 .transferMore(true)
+                                                 .transfer()
+                                                 .sync()
+                                                 .transferMore(false)
+                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                                                 .transfer()
+                                                 .consumeResponse()
+                                                 .getLatestResponse(Disposition.class);
+
+            assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
+            assertThat(disposition.getLast(), isOneOf(null, deliveryId));
+            assertThat(disposition.getSettled(), is(equalTo(false)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.5",
+            description = "[delivery-id] On continuation transfers the delivery-id MAY be omitted..."
+                          + "[delivery-tag] field MUST be specified for the first transfer of a multi-transfer"
+                          + " message and can only be omitted for continuation transfers.")
+    public void multiTransferMessageOmittingOptionalTagAndID() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            QpidByteBuffer[] payloads = splitPayload("testData", 4);
+            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+            Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+                       .transferDeliveryId(deliveryId)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transfer()
+                       .sync()
+                       .transferDeliveryId(deliveryId)
+                       .transferDeliveryTag(null)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                       .transfer()
+                       .sync()
+                       .transferDeliveryId(null)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(payloads[2]))
+                       .transfer()
+                       .sync()
+                       .transferDeliveryId(null)
+                       .transferDeliveryTag(null)
+                       .transferMore(false)
+                       .setPayloadOnTransfer(Collections.singletonList(payloads[3]))
+                       .transfer()
+                       .consumeResponse();
+
+            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+
+            assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
+            assertThat(disposition.getLast(), isOneOf(null, deliveryId));
+            assertThat(disposition.getSettled(), is(equalTo(false)));
+            assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
+        }
+    }
+
+
+    //
+
+    @Test
+    @SpecificationTest(section = "2.6.14",
+            description = "The sender MAY indicate an aborted attempt to deliver a message by setting the abort flag on the last transfer."
+                          + "In this case the receiver MUST discard the message data that was transferred prior to the abort.")
+    public void abortMultiTransferMessage() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+
+            final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+            final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+            Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transferDeliveryId(deliveryId)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferMore(true)
+                       .transfer()
+                       .sync()
+                       .setPayloadOnTransfer(null)
+                       .transferMore(null)
+                       .transferAborted(true)
+                       .transfer();
+
+            Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse();
+            assertThat(latestResponse, is(nullValue()));
+        }
+    }
+    @Test
+    @SpecificationTest(section = "2.6.14",
+            description = "[...]messages being transferred along different links MAY be interleaved")
+    public void multiTransferInterleaved() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+
+            UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
+            Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
+            UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
+            UnsignedInteger deliveryId2 = UnsignedInteger.ONE;
+
+            Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+
+                       .attachName("testLink1")
+                       .attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .attachName("testLink2")
+                       .attachHandle(linkHandle2)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferHandle(linkHandle1)
+                       .transferDeliveryId(deliverId1)
+                       .transferDeliveryTag(deliveryTag1)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transfer()
+                       .sync()
+
+                       .transferHandle(linkHandle2)
+                       .transferDeliveryId(deliveryId2)
+                       .transferDeliveryTag(deliveryTag2)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transfer()
+                       .sync()
+
+                       .transferHandle(linkHandle1)
+                       .transferDeliveryId(deliverId1)
+                       .transferDeliveryTag(deliveryTag1)
+                       .transferMore(false)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[1]))
+                       .transfer()
+                       .sync()
+
+                       .transferHandle(linkHandle2)
+                       .transferDeliveryId(deliveryId2)
+                       .transferDeliveryTag(deliveryTag2)
+                       .transferMore(false)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[1]))
+                       .transfer()
+                       .sync();
+
+            Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
+            for (int i = 0; i < 2; i++)
+            {
+                Disposition disposition = interaction.consumeResponse(Disposition.class)
+                                                     .getLatestResponse(Disposition.class);
+                dispositionMap.put(disposition.getFirst(), disposition);
+
+                assertThat(disposition.getLast(), isOneOf(null, disposition.getFirst()));
+                assertThat(disposition.getSettled(), is(equalTo(false)));
+                assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
+            }
+
+            assertThat("Unexpected number of dispositions", dispositionMap.size(), equalTo(2));
+            assertThat(dispositionMap.containsKey(deliverId1), is(true));
+            assertThat(dispositionMap.containsKey(deliveryId2), is(true));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.14",
+            description = "[...]messages transferred along a single link MUST NOT be interleaved")
+    public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
+    {
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+
+            Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
+            Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
+            UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
+            UnsignedInteger deliveryId2 = UnsignedInteger.ONE;
+
+            Interaction interaction = transport.newInteraction();
+
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class)
+
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class)
+
+                       .transferDeliveryId(deliverId1)
+                       .transferDeliveryTag(deliveryTag1)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transfer()
+                       .sync()
+
+                       .transferDeliveryId(deliveryId2)
+                       .transferDeliveryTag(deliveryTag2)
+                       .transferMore(true)
+                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transfer()
+                       .sync();
+
+            interaction.consumeResponse(Detach.class, End.class, Close.class);
+        }
+    }
+
+    private QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
+    {
+        MessageEncoder messageEncoder = new MessageEncoder();
+        final Header header = new Header();
+        messageEncoder.setHeader(header);
+        messageEncoder.addData(messageContent);
+        List<QpidByteBuffer> payload = messageEncoder.getPayload();
+        long size = QpidByteBufferUtils.remaining(payload);
+
+        QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
+        int chunkSize = (int) size / numberOfParts;
+        int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
+        for (int i = 0; i < numberOfParts; i++)
+        {
+            result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
+        }
+
+        int currentBufferIndex = 0;
+        for (QpidByteBuffer p : payload)
+        {
+            final int limit = p.limit();
+
+            while (p.hasRemaining())
+            {
+                QpidByteBuffer currentBuffer = result[currentBufferIndex];
+                if (currentBuffer.hasRemaining())
+                {
+                    int length = Math.min(p.remaining(), currentBuffer.remaining());
+                    p.limit(p.position() + length);
+                    currentBuffer.put(p.slice());
+                    p.position(p.position() + length);
+                    p.limit(limit);
+                }
+
+                if (!currentBuffer.hasRemaining())
+                {
+                    currentBuffer.flip();
+                    currentBufferIndex++;
+                }
+            }
+        }
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index cb6adbe..6ff5d5f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -20,15 +20,20 @@
 
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
 
 import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.core.Is;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -38,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -45,14 +51,18 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
 import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 import org.apache.qpid.tests.protocol.v1_0.Response;
@@ -60,6 +70,7 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
 
 public class TransferTest extends ProtocolTestBase
 {
+    public static final String TEST_MESSAGE_DATA = "foo";
     private InetSocketAddress _brokerAddress;
     private String _originalMmsMessageStorePersistence;
 
@@ -109,7 +120,6 @@ public class TransferTest extends ProtocolTestBase
         }
     }
 
-    @Ignore("QPID-7816")
     @Test
     @SpecificationTest(section = "2.7.5",
             description = "[delivery-tag] MUST be specified for the first transfer "
@@ -118,21 +128,18 @@ public class TransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            Close responseClose = transport.newInteraction()
-                                           .negotiateProtocol().consumeResponse()
-                                           .open().consumeResponse(Open.class)
-                                           .begin().consumeResponse(Begin.class)
-                                           .attachRole(Role.SENDER)
-                                           .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
-                                           .attach().consumeResponse(Attach.class)
-                                           .consumeResponse(Flow.class)
-                                           .transferDeliveryTag(null)
-                                           .transferPayloadData("testData")
-                                           .transfer()
-                                           .consumeResponse()
-                                           .getLatestResponse(Close.class);
-            assertThat(responseClose.getError(), is(notNullValue()));
-            assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.INVALID_FIELD));
+            Interaction interaction = transport.newInteraction()
+                                                 .negotiateProtocol().consumeResponse()
+                                                 .open().consumeResponse(Open.class)
+                                                 .begin().consumeResponse(Begin.class)
+                                                 .attachRole(Role.SENDER)
+                                                 .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                 .attach().consumeResponse(Attach.class)
+                                                 .consumeResponse(Flow.class)
+                                                 .transferDeliveryTag(null)
+                                                 .transferPayloadData("testData")
+                                                 .transfer();
+            interaction.consumeResponse(Detach.class, End.class, Close.class);
         }
     }
 
@@ -237,7 +244,7 @@ public class TransferTest extends ProtocolTestBase
                        .sync();
 
             final byte[] protocolResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
-            assertThat(protocolResponse, is(equalTo("AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8))));
+            assertThat(protocolResponse, is(equalTo("AMQP\0\1\0\0".getBytes(UTF_8))));
 
             interaction.consumeResponse().getLatestResponse(Open.class);
             interaction.consumeResponse().getLatestResponse(Begin.class);
@@ -275,7 +282,7 @@ public class TransferTest extends ProtocolTestBase
                                                                                    Rejected.REJECTED_SYMBOL)
                                                              .attach().consumeResponse(Attach.class)
                                                              .consumeResponse(Flow.class)
-                                                             .transferPayload(messageEncoder.getPayload())
+                                                             .setPayloadOnTransfer(messageEncoder.getPayload())
                                                              .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                              .transfer()
                                                              .consumeResponse()
@@ -320,7 +327,7 @@ public class TransferTest extends ProtocolTestBase
                                                   .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                   .attach().consumeResponse(Attach.class)
                                                   .consumeResponse(Flow.class)
-                                                  .transferPayload(messageEncoder.getPayload())
+                                                  .setPayloadOnTransfer(messageEncoder.getPayload())
                                                   .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                   .transfer()
                                                   .consumeResponse()
@@ -345,4 +352,302 @@ public class TransferTest extends ProtocolTestBase
             }
         }
     }
+
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+    public void receiveTransferUnsettled() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow();
+
+            MessageDecoder messageDecoder = new MessageDecoder();
+            boolean hasMore;
+            do
+            {
+                Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
+                messageDecoder.addTransfer(responseTransfer);
+                hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+            }
+            while (hasMore);
+
+            Object data = messageDecoder.getData();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+    public void receiveTransferReceiverSettleFirst() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition();
+
+            // verify that no unexpected performative is received by closing
+            transport.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+    public void receiveTransferReceiverSettleSecond() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+            Disposition disposition = interaction.dispositionSettled(false)
+                                                 .dispositionRole(Role.RECEIVER)
+                                                 .dispositionState(new Accepted())
+                                                 .disposition()
+                                                 .consumeResponse(Disposition.class)
+                                                 .getLatestResponse(Disposition.class);
+            assertThat(disposition.getSettled(), is(true));
+
+            interaction.consumeResponse(null, Flow.class);
+
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+    public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow();
+
+            Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+            interaction.dispositionSettled(false)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionState(new Rejected())
+                       .disposition()
+                       .consumeResponse(Disposition.class, Flow.class);
+            Response<?> response = interaction.getLatestResponse();
+            if (response.getBody() instanceof Flow)
+            {
+                interaction.consumeResponse(Disposition.class);
+            }
+
+            Disposition disposition = interaction.getLatestResponse(Disposition.class);
+            assertThat(disposition.getSettled(), is(true));
+
+            interaction.consumeResponse(null, Flow.class);
+
+        }
+    }
+
+    @Ignore
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+    public void receiveTransferReceiverSettleSecondWithImplicitDispositionState() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                     .attachSourceOutcomes()
+                                                     .attachSourceDefaultOutcome(null)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+            Disposition disposition = interaction.dispositionSettled(false)
+                                                 .dispositionRole(Role.RECEIVER)
+                                                 .dispositionState(null)
+                                                 .disposition()
+                                                 .consumeResponse(Disposition.class)
+                                                 .getLatestResponse(Disposition.class);
+            assertThat(disposition.getSettled(), is(true));
+
+            interaction.consumeResponse(null, Flow.class);
+
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.12", description = "[...] the receiving application MAY wish to indicate"
+                                                         + " non-terminal delivery states to the sender")
+    public void receiveTransferReceiverIndicateNonTerminalDeliveryState() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+            interaction.dispositionSettled(false)
+                       .dispositionRole(Role.RECEIVER)
+                       .dispositionState(new Received())
+                       .disposition();
+
+            Disposition disposition = interaction.dispositionSettled(false)
+                                                 .dispositionRole(Role.RECEIVER)
+                                                 .dispositionState(new Accepted())
+                                                 .disposition().consumeResponse(Disposition.class)
+                                                 .getLatestResponse(Disposition.class);
+            assertThat(disposition.getSettled(), is(true));
+
+            interaction.consumeResponse(null, Flow.class);
+        }
+    }
+
+
+    @Test
+    @SpecificationTest(section = "2.7.3", description = "The sender SHOULD respect the receiver’s desired settlement mode if"
+                                                        + "the receiver initiates the attach exchange and the sender supports the desired mode.")
+    public void receiveTransferSenderSettleModeSettled() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attachSndSettleMode(SenderSettleMode.SETTLED)
+                                                     .attach().consumeResponse(Attach.class);
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowNextIncomingId(UnsignedInteger.ZERO)
+                                                     .flowOutgoingWindow(UnsignedInteger.ZERO)
+                                                     .flowNextOutgoingId(UnsignedInteger.ZERO)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow();
+
+            List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
+            final AtomicBoolean isSettled = new AtomicBoolean();
+            transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}});
+
+            assertThat(isSettled.get(), is(true));
+
+            // verify no unexpected performative received by closing the connection
+           transport.doCloseConnection();
+
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index e2f8b5e..f63d85e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -143,7 +143,7 @@ public class DischargeTest extends ProtocolTestBase
             final Detach detachResponse = interaction.transferDeliveryId(UnsignedInteger.ONE)
                                                                 .transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
                                                                 .transferPayloadData(discharge)
-                                                                .transfer().consumeResponse()
+                                                                .transfer().consumeResponse(Detach.class)
                                                                 .getLatestResponse(Detach.class);
             Error error = detachResponse.getError();
             assertThat(error, is(notNullValue()));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org