You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/30 16:19:53 UTC

[1/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 72ed1aa52 -> 737c52807


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
new file mode 100644
index 0000000..e85cbc3
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.typeCompatibleWith;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+
+public class ResumeDeliveriesTest extends ProtocolTestBase
+{
+    private static final int MIN_MAX_FRAME_SIZE = 512;
+    private static final String TEST_MESSAGE_CONTENT = "foo";
+    private InetSocketAddress _brokerAddress;
+    private String _originalMmsMessageStorePersistence;
+
+    @Before
+    public void setUp()
+    {
+        _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+        System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
+        getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+        _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+    }
+
+    @After
+    public void tearDown()
+    {
+        if (_originalMmsMessageStorePersistence != null)
+        {
+            System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+        }
+        else
+        {
+            System.clearProperty("qpid.tests.mms.messagestore.persistence");
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13",
+            description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
+                          + " attach frame will carry the delivery-tags and delivery state of all deliveries"
+                          + " considered unsettled by the issuing link endpoint.")
+    public void resumeSendingLinkSingleUnsettledDelivery() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send a unsettled delivery
+            final Disposition responseDisposition = interaction.transferHandle(linkHandle1)
+                                                               .transferDeliveryId(UnsignedInteger.ZERO)
+                                                               .transferDeliveryTag(deliveryTag)
+                                                               .transferPayloadData(TEST_MESSAGE_CONTENT)
+                                                               .transfer()
+                                                               .consumeResponse()
+                                                               .getLatestResponse(Disposition.class);
+            assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+            assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+            final DeliveryState remoteDeliveryState = responseDisposition.getState();
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+                                                      .attachUnsettled(Collections.singletonMap(deliveryTag, null))
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+
+            // 5. assert content of unsettled map
+            assertThat(responseAttach2.getTarget(), is(notNullValue()));
+            final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+            assertThat(remoteUnsettled.get(deliveryTag).getClass(), typeCompatibleWith(remoteDeliveryState.getClass()));
+            assertThat(responseAttach2.getIncompleteUnsettled(), is(anyOf(nullValue(), equalTo(false))));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.3",
+            description = "If the local unsettled map is too large to be encoded within a frame of the agreed maximum"
+                          + " frame size then the session MAY be ended with the frame-size-too-small error. The"
+                          + " endpoint SHOULD make use of the ability to send an incomplete unsettled map (see below)"
+                          + " to avoid sending an error.")
+    public void resumeSendingLinkWithIncompleteUnsettled() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse();
+
+            // 0. Open connection with small max-frame-size
+            final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                                         .open().consumeResponse()
+                                         .getLatestResponse(Open.class);
+            interaction.begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send enough unsettled deliveries to cause incomplete-unsettled to be true
+            //    assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
+            interaction.transferHandle(linkHandle1)
+                       .transferPayloadData(TEST_MESSAGE_CONTENT);
+            Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+            for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+            {
+                final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+                final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+                                                                   .transferDeliveryTag(deliveryTag)
+                                                                   .transfer()
+                                                                   .consumeResponse(Disposition.class)
+                                                                   .getLatestResponse(Disposition.class);
+                assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+                assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+                localUnsettled.put(deliveryTag, null);
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                          .attachUnsettled(unsettled)
+                                                          .attachIncompleteUnsettled(true)
+                                                          .attach().consumeResponse(End.class, Attach.class)
+                                                          .getLatestResponse();
+
+            if (latestResponse.getBody() instanceof End)
+            {
+                // 5.a assert session end error
+                final End responseEnd = (End) latestResponse.getBody();
+                final Error error = responseEnd.getError();
+                assertThat(error, is(notNullValue()));
+                assertThat(error.getCondition().getValue(), is(equalTo(AmqpError.FRAME_SIZE_TOO_SMALL)));
+            }
+            else if (latestResponse.getBody() instanceof Attach)
+            {
+                // 5.b assert content of unsettled map
+                final Attach responseAttach2 = (Attach) latestResponse.getBody();
+                assertThat(responseAttach2.getTarget(), is(notNullValue()));
+                final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+                assertThat(remoteUnsettled, is(notNullValue()));
+                assertThat(remoteUnsettled.keySet(), is(not(empty())));
+                for (Binary deliveryTag : remoteUnsettled.keySet())
+                {
+                    assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+                }
+                assertThat(responseAttach2.getIncompleteUnsettled(), is(equalTo(true)));
+            }
+            else
+            {
+                fail(String.format("Unexpected response. Expected End or Attach. Got '%s'.", latestResponse.getBody()));
+            }
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.3", description =
+            "If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
+            + "When the map is incomplete the recipient of the map cannot take the absence of a delivery tag from "
+            + "the map as evidence of settlement. On receipt of an incomplete unsettled map a sending endpoint MUST "
+            + "NOT send any new deliveries (i.e. deliveries where resume is not set to true) to its partner (and "
+            + "a receiving endpoint which sent an incomplete unsettled map MUST detach with an error on "
+            + "receiving a transfer which does not have the resume flag set to true).")
+    public void rejectNewDeliveryWhilstUnsettledIncomplete() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse();
+
+            // 0. Open connection with small max-frame-size
+            final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                                         .open().consumeResponse()
+                                         .getLatestResponse(Open.class);
+            interaction.begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send enough unsettled deliverys to cause incomplete-unsettled to be true
+            //    assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
+            interaction.transferHandle(linkHandle1)
+                       .transferPayloadData(TEST_MESSAGE_CONTENT);
+            Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+            for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+            {
+                final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+                final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+                                                                   .transferDeliveryTag(deliveryTag)
+                                                                   .transfer()
+                                                                   .consumeResponse(Disposition.class)
+                                                                   .getLatestResponse(Disposition.class);
+                assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+                assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+                localUnsettled.put(deliveryTag, null);
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                          .attachUnsettled(unsettled)
+                                                          .attachIncompleteUnsettled(true)
+                                                          .attach().consumeResponse(End.class, Attach.class)
+                                                          .getLatestResponse();
+            assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+            // 5. ensure attach has incomplete-unsettled
+            final Attach responseAttach = (Attach) latestResponse.getBody();
+            assertThat(responseAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+            // 6. send new transfer
+            final Binary newDeliveryTag = new Binary("newTransfer".getBytes(StandardCharsets.UTF_8));
+            final Detach detachWithError = interaction.transferHandle(linkHandle2)
+                                                      .transferDeliveryId(UnsignedInteger.ONE)
+                                                      .transferDeliveryTag(newDeliveryTag)
+                                                      .transfer().consumeResponse()
+                                                      .getLatestResponse(Detach.class);
+            assertThat(detachWithError.getError(), is(notNullValue()));
+            final Error detachError = detachWithError.getError();
+            assertThat(detachError.getCondition(), is(equalTo(AmqpError.ILLEGAL_STATE)));
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.7.3", description =
+            "If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
+            + "When the map is incomplete the recipient of the map cannot take the absence of a delivery tag from "
+            + "the map as evidence of settlement. On receipt of an incomplete unsettled map a sending endpoint MUST "
+            + "NOT send any new deliveries (i.e. deliveries where resume is not set to true) to its partner (and "
+            + "a receiving endpoint which sent an incomplete unsettled map MUST detach with an error on "
+            + "receiving a transfer which does not have the resume flag set to true).")
+    public void incompleteUnsettledReceiving() throws Exception
+    {
+        for (int i = 0; i < MIN_MAX_FRAME_SIZE; i++)
+        {
+            getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT + "-" + i);
+        }
+
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr).connect())
+        {
+            // 1. open with small max-frame=512, begin, attach receiver with
+            //    with rcv-settle-mode=second, snd-settle-mode=unsettled,
+            //    flow with incoming-window=MAX_INTEGER and link-credit=MAX_INTEGER
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol()
+                       .consumeResponse()
+                       .openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+                       .open()
+                       .consumeResponse()
+                       .begin()
+                       .consumeResponse(Begin.class)
+                       .attachRole(Role.RECEIVER)
+                       .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+                       .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                       .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+                       .flowLinkCredit(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+                       .flowHandleFromLinkHandle()
+                       .flow();
+
+            // 2. Receive transfers
+            final Map<Binary, DeliveryState> localUnsettled = new HashMap<>();
+            for (int i = 0; i < MIN_MAX_FRAME_SIZE; )
+            {
+                Response<?> response = interaction.consumeResponse().getLatestResponse();
+                if (response.getBody() instanceof Transfer)
+                {
+                    assertThat(response.getBody(), Matchers.is(instanceOf(Transfer.class)));
+                    Transfer responseTransfer = (Transfer) response.getBody();
+                    assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+                    localUnsettled.putIfAbsent(responseTransfer.getDeliveryTag(), responseTransfer.getState());
+                    i++;
+                }
+                else if (response.getBody() instanceof Flow || response.getBody() instanceof Disposition)
+                {
+                    // ignore
+                }
+                else
+                {
+                    fail("Unexpected frame " + response.getBody());
+                }
+            }
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+            Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+                                                                            localUnsettled.get(sampleLocalUnsettled));
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+                                                    .attachUnsettled(unsettled)
+                                                    .attachIncompleteUnsettled(true)
+                                                    .attach().consumeResponse(End.class, Attach.class)
+                                                    .getLatestResponse();
+            assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+            final Attach resumingAttach = (Attach) latestResponse.getBody();
+            final Map<Binary, DeliveryState> remoteUnsettled = resumingAttach.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(not(empty())));
+            for (Binary deliveryTag : remoteUnsettled.keySet())
+            {
+                assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+            }
+            assertThat(resumingAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+            interaction.flowHandle(linkHandle2).flow();
+
+            boolean received = false;
+            while (!received)
+            {
+                Response<?> nextResponse = interaction.consumeResponse().getLatestResponse();
+                assertThat(nextResponse, is(notNullValue()));
+
+                if (nextResponse.getBody() instanceof Transfer)
+                {
+                    assertThat(nextResponse.getBody(), is(instanceOf(Transfer.class)));
+                    Transfer responseTransfer = (Transfer) nextResponse.getBody();
+                    assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+                    assertThat(responseTransfer.getDeliveryTag(), is(equalTo(sampleLocalUnsettled)));
+                    received = true;
+                }
+                else if (nextResponse.getBody() instanceof Flow || nextResponse.getBody() instanceof Disposition)
+                {
+                    // ignore
+                }
+                else
+                {
+                    fail("Unexpected frame " + nextResponse.getBody());
+                }
+            }
+
+            transport.doCloseConnection();
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+                                                         + " the unsettled field from the attach frame will carry"
+                                                         + " the delivery-tags and delivery state of all deliveries"
+                                                         + " considered unsettled by the issuing link endpoint.")
+    public void resumeReceivingLinkEmptyUnsettled() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attach().consumeResponse()
+                                                     .flowIncomingWindow(UnsignedInteger.ONE)
+                                                     .flowLinkCredit(UnsignedInteger.ONE)
+                                                     .flowHandleFromLinkHandle()
+                                                     .flow()
+                                                     .receiveDelivery()
+                                                     .decodeLatestDelivery();
+
+            Object data = interaction.getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            interaction.dispositionSettled(true)
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition();
+
+            Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(new HashMap<>())
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = attach.getUnsettled();
+            assertThat(unsettled.entrySet(), empty());
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+                                                         + " the unsettled field from the attach frame will carry"
+                                                         + " the delivery-tags and delivery state of all deliveries"
+                                                         + " considered unsettled by the issuing link endpoint.")
+    public void resumeReceivingLinkWithSingleUnsettledAccepted() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                                     .attach().consumeResponse();
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer transfer = transfers.get(0);
+
+            Binary deliveryTag = transfer.getDeliveryTag();
+            assertThat(deliveryTag, is(notNullValue()));
+            assertThat(transfer.getSettled(), is(not(equalTo(true))));
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(Collections.singletonMap(deliveryTag, new Accepted()))
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+            assertThat(unsettled, is(notNullValue()));
+            assertThat(unsettled.entrySet(), hasSize(1));
+            Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+            assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+            interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer resumeTransfer = transfers.get(0);
+
+            assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+            assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+            assertThat(resumeTransfer.getPayload(), is(nullValue()));
+
+            if (!Boolean.TRUE.equals(resumeTransfer.getSettled()))
+            {
+                interaction.dispositionSettled(true)
+                           .dispositionState(new Accepted())
+                           .dispositionRole(Role.RECEIVER)
+                           .disposition();
+            }
+
+            transport.doCloseConnection();
+
+            if (getBrokerAdmin().isQueueDepthSupported())
+            {
+                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+                           is(equalTo(0)));
+            }
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+                                                         + " the unsettled field from the attach frame will carry"
+                                                         + " the delivery-tags and delivery state of all deliveries"
+                                                         + " considered unsettled by the issuing link endpoint.")
+    public void resumeReceivingLinkOneUnsettledWithNoOutcome() throws Exception
+    {
+        getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+            final Interaction interaction = transport.newInteraction()
+                                                     .negotiateProtocol().consumeResponse()
+                                                     .open().consumeResponse()
+                                                     .begin().consumeResponse()
+                                                     .attachRole(Role.RECEIVER)
+                                                     .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+                                                     .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+                                                     .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+                                                     .attach().consumeResponse();
+
+            Attach attach = interaction.getLatestResponse(Attach.class);
+            assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+            interaction.flowIncomingWindow(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            List<Transfer> transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer transfer = transfers.get(0);
+
+            Binary deliveryTag = transfer.getDeliveryTag();
+            assertThat(deliveryTag, is(notNullValue()));
+            Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+            assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+            Detach detach = interaction.detach().consumeResponse(Detach.class).getLatestResponse(Detach.class);
+            assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+            interaction.attachUnsettled(Collections.singletonMap(deliveryTag, null))
+                       .attach()
+                       .consumeResponse(Attach.class);
+
+            Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+            Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+            assertThat(unsettled, is(notNullValue()));
+            assertThat(unsettled.entrySet(), hasSize(1));
+            Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+            assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+            interaction.flowNextIncomingId(UnsignedInteger.ONE)
+                       .flowLinkCredit(UnsignedInteger.ONE)
+                       .flowHandleFromLinkHandle()
+                       .flow()
+                       .receiveDelivery();
+
+            transfers = interaction.getLatestDelivery();
+            assertThat(transfers, hasSize(1));
+            Transfer resumeTransfer = transfers.get(0);
+
+            assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+            assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+            assertThat(resumeTransfer.getPayload(), is(notNullValue()));
+
+            interaction.dispositionSettled(true)
+                       .dispositionState(new Accepted())
+                       .dispositionRole(Role.RECEIVER)
+                       .disposition();
+
+            transport.doCloseConnection();
+
+            if (getBrokerAdmin().isQueueDepthSupported())
+            {
+                assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+                           Matchers.is(Matchers.equalTo(0)));
+            }
+        }
+    }
+
+    @Ignore("QPID-7845")
+    @Test
+    @SpecificationTest(section = "2.6.13",
+            description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
+                          + " attach frame will carry the delivery-tags and delivery state of all deliveries"
+                          + " considered unsettled by the issuing link endpoint.")
+    public void resumeSendingLinkSinglePartialDelivery() throws Exception
+    {
+        final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+        final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+
+        QpidByteBuffer[] messagePayload = Utils.splitPayload("testData1", 2);
+        try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+        {
+
+            final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+            final Interaction interaction = transport.newInteraction();
+            interaction.negotiateProtocol().consumeResponse()
+                       .open().consumeResponse(Open.class)
+                       .begin().consumeResponse(Begin.class);
+
+            // 1. attach with ReceiverSettleMode.SECOND
+            interaction.attachHandle(linkHandle1)
+                       .attachRole(Role.SENDER)
+                       .attachTargetAddress(destination)
+                       .attach().consumeResponse(Attach.class)
+                       .consumeResponse(Flow.class);
+
+            // 2. send a partial delivery
+            interaction.transferHandle(linkHandle1)
+                       .transferDeliveryId(UnsignedInteger.ZERO)
+                       .transferDeliveryTag(deliveryTag)
+                       .transferMore(true)
+                       .transferPayload(Collections.singletonList(messagePayload[0]))
+                       .transfer();
+
+            // 3. detach the link
+            interaction.detach().consumeResponse(Detach.class);
+
+            // 4. resume the link
+            final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+            final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+                                                      .attachUnsettled(Collections.singletonMap(deliveryTag, null))
+                                                      .attach().consumeResponse()
+                                                      .getLatestResponse(Attach.class);
+
+            // 5. assert content of unsettled map
+            assertThat(responseAttach2.getTarget(), is(notNullValue()));
+
+            final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+            assertThat(remoteUnsettled, is(notNullValue()));
+            assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+
+            interaction.transferHandle(linkHandle2)
+                       .transferResume(true)
+                       .transfer()
+                       .sync()
+                       .transferMore(false)
+                       .transferPayload(Collections.singletonList(messagePayload[1]))
+                       .transfer();
+
+            boolean settled = false;
+            do
+            {
+                interaction.consumeResponse();
+                Response<?> response = interaction.getLatestResponse();
+                assertThat(response, is(notNullValue()));
+
+                Object body = response.getBody();
+
+                if (body instanceof Disposition)
+                {
+                    Disposition disposition = (Disposition) body;
+                    assertThat(disposition.getSettled(), is(Matchers.equalTo(true)));
+                    assertThat(disposition.getFirst(), equalTo(UnsignedInteger.ZERO));
+                    settled = true;
+                }
+                else if (!(body instanceof Flow))
+                {
+                    fail("Unexpected response " + body);
+                }
+            }
+            while (!settled);
+
+            transport.doCloseConnection();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
index 3868246..e313222 100644
--- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
@@ -63,7 +63,7 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
         producer.send(secondMessage);
 
 
-        Message receivedMessage = prefetch1consumer.receive(2000l);
+        final Message receivedMessage = prefetch1consumer.receive(2000l);
         assertNotNull("First message was not received", receivedMessage);
         assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
 
@@ -73,9 +73,9 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
         final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
 
-        receivedMessage = prefetch2consumer.receive(2000l);
-        assertNotNull("Second message was not received", receivedMessage);
-        assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
+        final Message receivedMessage2 = prefetch2consumer.receive(2000l);
+        assertNotNull("Second message was not received", receivedMessage2);
+        assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME));
 
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled

Posted by or...@apache.org.
QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/737c5280
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/737c5280
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/737c5280

Branch: refs/heads/master
Commit: 737c52807080b2e54fa6f4b419c0086df375e2bc
Parents: 72ed1aa
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 30 17:15:04 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 30 17:15:04 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  82 +-
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  60 +-
 .../protocol/v1_0/SendingLinkEndpoint.java      |   3 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  |   6 +-
 .../v1_0/StandardReceivingLinkEndpoint.java     |  25 +-
 .../protocol/v1_0/framing/FrameHandler.java     |   1 +
 .../v1_0/framing/OversizeFrameException.java    |   1 +
 .../protocol/v1_0/type/messaging/Accepted.java  |  30 +-
 .../protocol/v1_0/type/messaging/Modified.java  |  47 +-
 .../protocol/v1_0/type/messaging/Received.java  |  24 +-
 .../protocol/v1_0/type/messaging/Rejected.java  |  40 +-
 .../protocol/v1_0/type/messaging/Released.java  |  29 +-
 .../v1_0/type/transaction/Declared.java         |  31 +-
 .../type/transaction/TransactionalState.java    |  25 +-
 .../server/protocol/v1_0/Session_1_0Test.java   |  10 +
 .../tests/protocol/v1_0/FrameTransport.java     |   2 +-
 .../qpid/tests/protocol/v1_0/Interaction.java   |  77 +-
 .../apache/qpid/tests/protocol/v1_0/Utils.java  |  48 ++
 .../v1_0/messaging/MultiTransferTest.java       |  91 +--
 .../protocol/v1_0/messaging/TransferTest.java   |  42 +-
 .../transaction/TransactionalTransferTest.java  |   1 -
 .../transport/link/ResumeDeliveriesTest.java    | 766 +++++++++++++++++++
 .../qpid/systest/prefetch/ZeroPrefetchTest.java |   8 +-
 23 files changed, 1124 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 55432c9..5410766 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.protocol.v1_0;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -31,6 +33,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
 import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
 import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -38,8 +41,11 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
 import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
@@ -49,6 +55,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
 public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T>
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
+    private static final int FRAME_HEADER_SIZE = 8;
+
     private final Link_1_0<S, T> _link;
     private final Session_1_0 _session;
 
@@ -66,6 +74,9 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     private volatile Map<Symbol, Object> _properties;
     private volatile State _state = State.ATTACH_RECVD;
 
+    protected boolean _remoteIncompleteUnsettled;
+    protected boolean _localIncompleteUnsettled;
+
     protected enum State
     {
         DETACHED,
@@ -129,7 +140,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         _receivingSettlementMode = attach.getRcvSettleMode();
         _properties = initProperties(attach);
         _state = State.ATTACH_RECVD;
-
+        _remoteIncompleteUnsettled = Boolean.TRUE.equals(attach.getIncompleteUnsettled());
         if (getRole() == Role.RECEIVER)
         {
             getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
@@ -306,6 +317,8 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
             attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
         }
 
+        attachToSend = handleOversizedUnsettledMapIfNecessary(attachToSend);
+
         switch (_state)
         {
             case DETACHED:
@@ -322,6 +335,73 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
 
     }
 
+    private Attach handleOversizedUnsettledMapIfNecessary(final Attach attachToSend)
+    {
+        final AMQPDescribedTypeRegistry describedTypeRegistry = getSession().getConnection().getDescribedTypeRegistry();
+        final ValueWriter<Attach> valueWriter = describedTypeRegistry.getValueWriter(attachToSend);
+        if (valueWriter.getEncodedSize() + 8 > getSession().getConnection().getMaxFrameSize())
+        {
+            _localIncompleteUnsettled = true;
+            attachToSend.setIncompleteUnsettled(true);
+            final int targetSize = getSession().getConnection().getMaxFrameSize();
+            int lowIndex = 0;
+            Map<Binary, DeliveryState> localUnsettledMap = attachToSend.getUnsettled();
+            if (localUnsettledMap == null)
+            {
+                localUnsettledMap = Collections.emptyMap();
+            }
+            int highIndex = localUnsettledMap.size();
+            int currentIndex = (highIndex - lowIndex) / 2;
+            int oldIndex;
+            HashMap<Binary, DeliveryState> unsettledMap = null;
+            int totalSize;
+            do
+            {
+                HashMap<Binary, DeliveryState> partialUnsettledMap = new HashMap<>(currentIndex);
+                final Iterator<Map.Entry<Binary, DeliveryState>> iterator = localUnsettledMap.entrySet().iterator();
+                for (int i = 0; i < currentIndex; ++i)
+                {
+                    final Map.Entry<Binary, DeliveryState> entry = iterator.next();
+                    partialUnsettledMap.put(entry.getKey(), entry.getValue());
+                }
+                attachToSend.setUnsettled(partialUnsettledMap);
+                totalSize = describedTypeRegistry.getValueWriter(attachToSend).getEncodedSize() + FRAME_HEADER_SIZE;
+                if (totalSize > targetSize)
+                {
+                    highIndex = currentIndex;
+                }
+                else if (totalSize < targetSize)
+                {
+                    lowIndex = currentIndex;
+                    unsettledMap = partialUnsettledMap;
+                }
+                else
+                {
+                    lowIndex = highIndex = currentIndex;
+                    unsettledMap = partialUnsettledMap;
+                }
+
+                oldIndex = currentIndex;
+                currentIndex = lowIndex + (highIndex - lowIndex) / 2;
+            }
+            while (oldIndex != currentIndex);
+
+            if (unsettledMap == null || unsettledMap.isEmpty())
+            {
+                final End endWithError = new End();
+                endWithError.setError(new Error(AmqpError.FRAME_SIZE_TOO_SMALL, "Cannot fit a single unsettled delivery into Attach frame."));
+                getSession().end(endWithError);
+            }
+
+            attachToSend.setUnsettled(unsettledMap);
+        }
+        else
+        {
+            _localIncompleteUnsettled = false;
+        }
+        return attachToSend;
+    }
+
     public void detach()
     {
         detach(null, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index e87fffb..0b78622 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -117,16 +117,8 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             {
                 _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
             }
-            else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
-            {
-                final Error error = new Error(AmqpError.ILLEGAL_STATE,
-                                              String.format("Resumed transfer with delivery tag '%s' is not found.",
-                                                            _currentDelivery.getDeliveryTag()));
-                close(error);
-                return;
-            }
 
-            if (_currentDelivery.isAborted())
+            if (_currentDelivery.isAborted() || (_currentDelivery.getResume() && !_unsettled.containsKey(_currentDelivery.getDeliveryTag())))
             {
                 _unsettled.remove(_currentDelivery.getDeliveryTag());
                 getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
@@ -176,14 +168,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
             error = new Error(AmqpError.INVALID_FIELD,
                                     "Transfer \"delivery-tag\" is required for a new delivery.");
         }
-        else if (_unsettled.containsKey(transfer.getDeliveryTag()))
+        else if (!Boolean.TRUE.equals(transfer.getResume()))
         {
-            error = new Error(AmqpError.ILLEGAL_STATE,
-                              String.format("Delivery-tag '%s' is used by another unsettled delivery."
-                                            + " The delivery-tag MUST be unique amongst all deliveries that"
-                                            + " could be considered unsettled by either end of the link.",
-                                            transfer.getDeliveryTag()));
+            if (_unsettled.containsKey(transfer.getDeliveryTag()))
+            {
+                error = new Error(AmqpError.ILLEGAL_STATE,
+                                  String.format("Delivery-tag '%s' is used by another unsettled delivery."
+                                                + " The delivery-tag MUST be unique amongst all deliveries that"
+                                                + " could be considered unsettled by either end of the link.",
+                                                transfer.getDeliveryTag()));
+            }
+            else if (_localIncompleteUnsettled || _remoteIncompleteUnsettled)
+            {
+                error = new Error(AmqpError.ILLEGAL_STATE,
+                                  "Cannot accept new deliveries while incomplete-unsettled is true.");
+            }
         }
+
         return error;
     }
 
@@ -297,20 +298,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         sendFlowConditional();
     }
 
-    @Override
-    public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
-    {
-        super.receiveDeliveryState(deliveryTag, state, settled);
-        if(_creditWindow)
-        {
-            if(Boolean.TRUE.equals(settled))
-            {
-                setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
-                sendFlowConditional();
-            }
-        }
-    }
-
     SectionDecoder getSectionDecoder()
     {
         return _sectionDecoder;
@@ -321,11 +308,11 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     {
         super.settle(deliveryTag);
         _unsettled.remove(deliveryTag);
-        if(_creditWindow)
+        if (_creditWindow)
         {
-             sendFlowConditional();
+            setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+            sendFlowConditional();
         }
-
     }
 
     public void flowStateChanged()
@@ -341,13 +328,10 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
         }
         finally
         {
-            if (close)
+            if (_currentDelivery != null)
             {
-                if (_currentDelivery != null)
-                {
-                    _currentDelivery.discard();
-                    _currentDelivery = null;
-                }
+                _currentDelivery.discard();
+                _currentDelivery = null;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index e6763eb..a78f7f0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -644,11 +644,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         Map<Binary, DeliveryState> remoteUnsettled =
                 attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
 
+        final boolean isUnsettledComplete = !Boolean.TRUE.equals(attach.getIncompleteUnsettled());
         for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
         {
             Binary deliveryTag = entry.getKey();
             final MessageInstance queueEntry = entry.getValue().getMessageInstance();
-            if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+            if (!remoteUnsettled.containsKey(deliveryTag) && isUnsettledComplete)
             {
                 queueEntry.setRedelivered();
                 queueEntry.release(oldConsumer);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 65a5265..ba06376 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -311,7 +311,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         try
         {
             List<QpidByteBuffer> payload = xfr.getPayload();
-            final long remaining = QpidByteBufferUtils.remaining(payload);
+            final long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
             int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
 
             if(payload != null && payloadSent < remaining && payloadSent >= 0)
@@ -320,11 +320,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
                 Transfer secondTransfer = new Transfer();
 
-                secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
                 secondTransfer.setHandle(xfr.getHandle());
-                secondTransfer.setSettled(xfr.getSettled());
+                secondTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
                 secondTransfer.setState(xfr.getState());
-                secondTransfer.setMessageFormat(xfr.getMessageFormat());
                 secondTransfer.setPayload(payload);
 
                 sendTransfer(secondTransfer, endpoint, false);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 048fa20..447e7cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
 import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
 import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -270,8 +271,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
     @Override
     protected void remoteDetachedPerformDetach(Detach detach)
     {
-        if(!TerminusDurability.UNSETTLED_STATE.equals(getDurability()) ||
-           (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+        final TerminusExpiryPolicy expiryPolicy = getTarget().getExpiryPolicy();
+        if((detach != null && Boolean.TRUE.equals(detach.getClosed()))
+           || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
+           || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+           || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
         {
             close();
         }
@@ -332,6 +336,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
             }
             target.setCapabilities(targetCapabilities.toArray(new Symbol[targetCapabilities.size()]));
         }
+        target.setExpiryPolicy(attachTarget.getExpiryPolicy());
 
         final ReceivingDestination destination = getSession().getReceivingDestination(getLink(), target);
 
@@ -341,17 +346,19 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         setCapabilities(targetCapabilities);
         setDestination(destination);
 
-        Map remoteUnsettled = attach.getUnsettled();
-        Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
-        for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
+        if (!Boolean.TRUE.equals(attach.getIncompleteUnsettled()))
         {
-            Binary deliveryTag = entry.getKey();
-            if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+            Map remoteUnsettled = attach.getUnsettled();
+            Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+            for (Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
             {
-                _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
+                Binary deliveryTag = entry.getKey();
+                if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+                {
+                    _unsettled.remove(deliveryTag);
+                }
             }
         }
-
         getLink().setTermini(source, target);
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index c5e0334..581c3c5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -59,6 +59,7 @@ public class FrameHandler implements ProtocolHandler
     {
         try
         {
+            LOGGER.debug("RECV {} bytes", in.remaining());
             Error frameParsingError = null;
             int size;
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
index e4a94bf..a77e587 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
@@ -26,6 +26,7 @@ public class OversizeFrameException extends RuntimeException
 
     public OversizeFrameException(final AMQFrame frame, final int size)
     {
+        super("Tried to send frame of size: " + String.valueOf(size));
         _frame = frame;
         _size = size;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
index fb9f446..abf8509 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
@@ -23,31 +23,25 @@
 
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
+public class Accepted implements Outcome
+{
+    public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Accepted
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-      public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
-
-      @Override
-      public Symbol getSymbol()
-      {
-          return ACCEPTED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return ACCEPTED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Accepted{");
-        final int origLength = builder.length();
 
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
index 43264a9..41d2fa7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
@@ -24,21 +24,18 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
-
 import java.util.Map;
 
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Modified
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
+public class Modified implements Outcome
+{
+    public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
 
-      public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
-
-      @CompositeTypeField
-      private Boolean _deliveryFailed;
+    @CompositeTypeField
+    private Boolean _deliveryFailed;
 
     @CompositeTypeField
     private Boolean _undeliverableHere;
@@ -76,39 +73,39 @@ public class Modified
         _messageAnnotations = messageAnnotations;
     }
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return MODIFIED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return MODIFIED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Modified{");
         final int origLength = builder.length();
 
-        if(_deliveryFailed != null)
+        if (_deliveryFailed != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("deliveryFailed=").append(_deliveryFailed);
         }
 
-        if(_undeliverableHere != null)
+        if (_undeliverableHere != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("undeliverableHere=").append(_undeliverableHere);
         }
 
-        if(_messageAnnotations != null)
+        if (_messageAnnotations != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -118,6 +115,4 @@ public class Modified
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
index 3ad8cb7..86d9554 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
@@ -24,13 +24,13 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Received
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState
-  {
-
+public class Received implements DeliveryState
+{
     @CompositeTypeField(mandatory = true)
     private UnsignedInteger _sectionNumber;
 
@@ -63,18 +63,18 @@ public class Received
         StringBuilder builder = new StringBuilder("Received{");
         final int origLength = builder.length();
 
-        if(_sectionNumber != null)
+        if (_sectionNumber != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("sectionNumber=").append(_sectionNumber);
         }
 
-        if(_sectionOffset != null)
+        if (_sectionOffset != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -84,6 +84,4 @@ public class Received
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
index 5f5ca24..e877a56 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
@@ -24,21 +24,17 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
-
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
 
+public class Rejected implements Outcome
+{
+    public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Rejected
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-
-      public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
-
-      @CompositeTypeField
-      private Error _error;
+    @CompositeTypeField
+    private Error _error;
 
     public Error getError()
     {
@@ -50,21 +46,21 @@ public class Rejected
         _error = error;
     }
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return REJECTED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return REJECTED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Rejected{");
         final int origLength = builder.length();
 
-        if(_error != null)
+        if (_error != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -74,6 +70,4 @@ public class Rejected
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
index 4ba74f4..945c7ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
@@ -24,23 +24,20 @@
 package org.apache.qpid.server.protocol.v1_0.type.messaging;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
+public class Released implements Outcome
+{
+    public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
 
-public class Released
-  implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
-  {
-
-
-      public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
-
-      @Override
-      public Symbol getSymbol()
-      {
-          return RELEASED_SYMBOL;
-      }
+    @Override
+    public Symbol getSymbol()
+    {
+        return RELEASED_SYMBOL;
+    }
 
-      @Override
+    @Override
     public String toString()
     {
         StringBuilder builder = new StringBuilder("Released{");
@@ -49,6 +46,4 @@ public class Released
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
index 17c4e62..a9b9343 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
@@ -24,22 +24,17 @@
 package org.apache.qpid.server.protocol.v1_0.type.transaction;
 
 
-
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 
-public class Declared
-  implements DeliveryState, Outcome
-  {
-
-
-      public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
+public class Declared implements Outcome
+{
+    public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
 
-      @CompositeTypeField(mandatory = true)
-      private Binary _txnId;
+    @CompositeTypeField(mandatory = true)
+    private Binary _txnId;
 
     public Binary getTxnId()
     {
@@ -57,9 +52,9 @@ public class Declared
         StringBuilder builder = new StringBuilder("Declared{");
         final int origLength = builder.length();
 
-        if(_txnId != null)
+        if (_txnId != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -71,9 +66,9 @@ public class Declared
     }
 
 
-      @Override
-      public Symbol getSymbol()
-      {
-          return DECLARED_SYMBOL;
-      }
-  }
+    @Override
+    public Symbol getSymbol()
+    {
+        return DECLARED_SYMBOL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
index 1f2d29b..20db2a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
@@ -24,14 +24,13 @@
 package org.apache.qpid.server.protocol.v1_0.type.transaction;
 
 
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
 
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class TransactionalState
-  implements DeliveryState
-  {
-
-
+public class TransactionalState implements DeliveryState
+{
     @CompositeTypeField(mandatory = true)
     private Binary _txnId;
 
@@ -64,18 +63,18 @@ public class TransactionalState
         StringBuilder builder = new StringBuilder("TransactionalState{");
         final int origLength = builder.length();
 
-        if(_txnId != null)
+        if (_txnId != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
             builder.append("txnId=").append(_txnId);
         }
 
-        if(_outcome != null)
+        if (_outcome != null)
         {
-            if(builder.length() != origLength)
+            if (builder.length() != origLength)
             {
                 builder.append(',');
             }
@@ -85,6 +84,4 @@ public class TransactionalState
         builder.append('}');
         return builder.toString();
     }
-
-
-  }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 4a22d8e..468fe15 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
 import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.Symbol;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -79,6 +80,13 @@ import org.apache.qpid.test.utils.QpidTestCase;
 
 public class Session_1_0Test extends QpidTestCase
 {
+    private static final AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+                                                                                                      .registerTransportLayer()
+                                                                                                      .registerMessagingLayer()
+                                                                                                      .registerTransactionLayer()
+                                                                                                      .registerSecurityLayer()
+                                                                                                      .registerExtensionSoleconnLayer();
+
     private static final String TOPIC_NAME = "testTopic";
     private static final String QUEUE_NAME = "testQueue";
     private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
@@ -721,6 +729,8 @@ public class Session_1_0Test extends QpidTestCase
         when(connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
         when(connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
         when(connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+        when(connection.getDescribedTypeRegistry()).thenReturn(DESCRIBED_TYPE_REGISTRY);
+        when(connection.getMaxFrameSize()).thenReturn(512);
         final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
         when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 9aeb4e9..8746941 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -62,7 +62,7 @@ public class FrameTransport implements AutoCloseable
     public static final long RESPONSE_TIMEOUT = 6000;
     private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
 
-    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100);
+    private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
 
     private final EventLoopGroup _workerGroup;
     private final InetSocketAddress _brokerAddress;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 008b016..3a6b083 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -228,15 +228,15 @@ public class Interaction
         return this;
     }
 
-    public Interaction openChannelMax(UnsignedShort channelMax)
+    public Interaction openMaxFrameSize(final UnsignedInteger maxFrameSize)
     {
-        _open.setChannelMax(channelMax);
+        _open.setMaxFrameSize(maxFrameSize);
         return this;
     }
 
-    public Interaction openMaxFrameSize(UnsignedInteger maxFrameSize)
+    public Interaction openChannelMax(UnsignedShort channelMax)
     {
-        _open.setMaxFrameSize(maxFrameSize);
+        _open.setChannelMax(channelMax);
         return this;
     }
 
@@ -397,6 +397,18 @@ public class Interaction
         return this;
     }
 
+    public Interaction attachUnsettled(final Map<Binary, DeliveryState> unsettled)
+    {
+        _attach.setUnsettled(unsettled);
+        return this;
+    }
+
+    public Interaction attachIncompleteUnsettled(final Boolean incompleteUnsettled)
+    {
+        _attach.setIncompleteUnsettled(incompleteUnsettled);
+        return this;
+    }
+
     public Interaction attach() throws Exception
     {
         sendPerformativeAndChainFuture(_attach, _sessionChannel);
@@ -501,28 +513,33 @@ public class Interaction
         return this;
     }
 
+    public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+    {
+        _transfer.setDeliveryId(deliveryId);
+        return this;
+    }
+
     public Interaction transferDeliveryTag(final Binary deliveryTag)
     {
         _transfer.setDeliveryTag(deliveryTag);
         return this;
     }
 
-    public Interaction transferState(final DeliveryState state)
+    public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
     {
-        _transfer.setState(state);
+        _transfer.setMessageFormat(messageFormat);
         return this;
     }
 
-    public Interaction transferTransactionalState(final Binary transactionalId)
+    public Interaction transferSettled(final Boolean settled)
     {
-        TransactionalState transactionalState = new TransactionalState();
-        transactionalState.setTxnId(transactionalId);
-        return transferState(transactionalState);
+        _transfer.setSettled(settled);
+        return this;
     }
 
-    public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+    public Interaction transferMore(final Boolean more)
     {
-        _transfer.setDeliveryId(deliveryId);
+        _transfer.setMore(more);
         return this;
     }
 
@@ -532,25 +549,32 @@ public class Interaction
         return this;
     }
 
-    public Interaction transferMore(final Boolean more)
+    public Interaction transferState(final DeliveryState state)
     {
-        _transfer.setMore(more);
+        _transfer.setState(state);
         return this;
     }
 
-    public Interaction transferAborted(final Boolean aborted)
+    public Interaction transferTransactionalState(final Binary transactionalId)
     {
-        _transfer.setAborted(aborted);
+        TransactionalState transactionalState = new TransactionalState();
+        transactionalState.setTxnId(transactionalId);
+        return transferState(transactionalState);
+    }
+
+    public Interaction transferResume(final Boolean resume)
+    {
+        _transfer.setResume(resume);
         return this;
     }
 
-    public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
+    public Interaction transferAborted(final Boolean aborted)
     {
-        _transfer.setMessageFormat(messageFormat);
+        _transfer.setAborted(aborted);
         return this;
     }
 
-    public Interaction setPayloadOnTransfer(final List<QpidByteBuffer> payload)
+    public Interaction transferPayload(final List<QpidByteBuffer> payload)
     {
         _transfer.setPayload(payload);
         return this;
@@ -558,11 +582,11 @@ public class Interaction
 
     public Interaction transferPayloadData(final Object payload)
     {
-        setPayloadOnTransfer(_transfer, payload);
+        transferPayload(_transfer, payload);
         return this;
     }
 
-    private void setPayloadOnTransfer(final Transfer transfer, final Object payload)
+    private void transferPayload(final Transfer transfer, final Object payload)
     {
         AmqpValue amqpValue = new AmqpValue(payload);
         final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
@@ -575,12 +599,6 @@ public class Interaction
         }
     }
 
-    public Interaction transferSettled(final Boolean settled)
-    {
-        _transfer.setSettled(settled);
-        return this;
-    }
-
     public Interaction transfer() throws Exception
     {
         sendPerformativeAndChainFuture(_transfer, _sessionChannel);
@@ -648,7 +666,7 @@ public class Interaction
     public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
     {
         Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        setPayloadOnTransfer(transfer, new Declare());
+        transferPayload(transfer, new Declare());
         sendPerformativeAndChainFuture(transfer, _sessionChannel);
         consumeResponse(Disposition.class);
         Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
@@ -668,7 +686,7 @@ public class Interaction
         discharge.setFail(failed);
 
         Transfer transfer = createTransactionTransfer(txnState.getHandle());
-        setPayloadOnTransfer(transfer, discharge);
+        transferPayload(transfer, discharge);
         sendPerformativeAndChainFuture(transfer, _sessionChannel);
 
         Disposition declareTransactionDisposition = null;
@@ -818,6 +836,7 @@ public class Interaction
 
     public Interaction receiveDelivery() throws Exception
     {
+        sync();
         _latestDelivery = receiveAllTransfers();
         return this;
     }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 446c112..f0373f5 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -25,8 +25,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -87,4 +91,48 @@ public class Utils
             return interaction.getDecodedLatestDelivery();
         }
     }
+
+    public static  QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
+    {
+        MessageEncoder messageEncoder = new MessageEncoder();
+        final Header header = new Header();
+        messageEncoder.setHeader(header);
+        messageEncoder.addData(messageContent);
+        List<QpidByteBuffer> payload = messageEncoder.getPayload();
+        long size = QpidByteBufferUtils.remaining(payload);
+
+        QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
+        int chunkSize = (int) size / numberOfParts;
+        int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
+        for (int i = 0; i < numberOfParts; i++)
+        {
+            result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
+        }
+
+        int currentBufferIndex = 0;
+        for (QpidByteBuffer p : payload)
+        {
+            final int limit = p.limit();
+
+            while (p.hasRemaining())
+            {
+                QpidByteBuffer currentBuffer = result[currentBufferIndex];
+                if (currentBuffer.hasRemaining())
+                {
+                    int length = Math.min(p.remaining(), currentBuffer.remaining());
+                    p.limit(p.position() + length);
+                    currentBuffer.put(p.slice());
+                    p.position(p.position() + length);
+                    p.limit(limit);
+                }
+
+                if (!currentBuffer.hasRemaining())
+                {
+                    currentBuffer.flip();
+                    currentBufferIndex++;
+                }
+            }
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 2f83a13..1c09bb6 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.isOneOf;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import org.junit.After;
@@ -39,11 +38,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
 import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
@@ -57,10 +54,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
 import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
 import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
 import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
 import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
 import org.apache.qpid.tests.protocol.v1_0.Response;
 import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
 
 public class MultiTransferTest extends ProtocolTestBase
 {
@@ -99,7 +96,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
 
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -114,14 +111,14 @@ public class MultiTransferTest extends ProtocolTestBase
                                                  .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                  .attach().consumeResponse(Attach.class)
                                                  .consumeResponse(Flow.class)
-                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                                                 .transferPayload(Collections.singletonList(payloads[0]))
                                                  .transferDeliveryId(deliveryId)
                                                  .transferDeliveryTag(deliveryTag)
                                                  .transferMore(true)
                                                  .transfer()
                                                  .sync()
                                                  .transferMore(false)
-                                                 .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                                                 .transferPayload(Collections.singletonList(payloads[1]))
                                                  .transfer()
                                                  .consumeResponse()
                                                  .getLatestResponse(Disposition.class);
@@ -141,7 +138,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 4);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 4);
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
 
@@ -158,25 +155,25 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transferPayload(Collections.singletonList(payloads[0]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(null)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+                       .transferPayload(Collections.singletonList(payloads[1]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(null)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[2]))
+                       .transferPayload(Collections.singletonList(payloads[2]))
                        .transfer()
                        .sync()
                        .transferDeliveryId(null)
                        .transferDeliveryTag(null)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[3]))
+                       .transferPayload(Collections.singletonList(payloads[3]))
                        .transfer()
                        .consumeResponse();
 
@@ -200,7 +197,7 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] payloads = splitPayload("testData", 2);
+            QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
 
             final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
             final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -215,13 +212,13 @@ public class MultiTransferTest extends ProtocolTestBase
                        .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                        .attach().consumeResponse(Attach.class)
                        .consumeResponse(Flow.class)
-                       .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+                       .transferPayload(Collections.singletonList(payloads[0]))
                        .transferDeliveryId(deliveryId)
                        .transferDeliveryTag(deliveryTag)
                        .transferMore(true)
                        .transfer()
                        .sync()
-                       .setPayloadOnTransfer(null)
+                       .transferPayload(null)
                        .transferMore(null)
                        .transferAborted(true)
                        .transfer();
@@ -237,8 +234,8 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
-            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+            QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
 
             UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
             UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
@@ -275,7 +272,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transferPayload(Collections.singletonList(messagePayload1[0]))
                        .transfer()
                        .sync()
 
@@ -283,7 +280,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transferPayload(Collections.singletonList(messagePayload2[0]))
                        .transfer()
                        .sync()
 
@@ -291,7 +288,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[1]))
+                       .transferPayload(Collections.singletonList(messagePayload1[1]))
                        .transfer()
                        .sync()
 
@@ -299,7 +296,7 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(false)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[1]))
+                       .transferPayload(Collections.singletonList(messagePayload2[1]))
                        .transfer()
                        .sync();
 
@@ -328,8 +325,8 @@ public class MultiTransferTest extends ProtocolTestBase
     {
         try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
         {
-            QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
-            QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+            QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+            QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
 
             Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
             Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
@@ -352,62 +349,18 @@ public class MultiTransferTest extends ProtocolTestBase
                        .transferDeliveryId(deliverId1)
                        .transferDeliveryTag(deliveryTag1)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+                       .transferPayload(Collections.singletonList(messagePayload1[0]))
                        .transfer()
                        .sync()
 
                        .transferDeliveryId(deliveryId2)
                        .transferDeliveryTag(deliveryTag2)
                        .transferMore(true)
-                       .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+                       .transferPayload(Collections.singletonList(messagePayload2[0]))
                        .transfer()
                        .sync();
 
             interaction.consumeResponse(Detach.class, End.class, Close.class);
         }
     }
-
-    private QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
-    {
-        MessageEncoder messageEncoder = new MessageEncoder();
-        final Header header = new Header();
-        messageEncoder.setHeader(header);
-        messageEncoder.addData(messageContent);
-        List<QpidByteBuffer> payload = messageEncoder.getPayload();
-        long size = QpidByteBufferUtils.remaining(payload);
-
-        QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
-        int chunkSize = (int) size / numberOfParts;
-        int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
-        for (int i = 0; i < numberOfParts; i++)
-        {
-            result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
-        }
-
-        int currentBufferIndex = 0;
-        for (QpidByteBuffer p : payload)
-        {
-            final int limit = p.limit();
-
-            while (p.hasRemaining())
-            {
-                QpidByteBuffer currentBuffer = result[currentBufferIndex];
-                if (currentBuffer.hasRemaining())
-                {
-                    int length = Math.min(p.remaining(), currentBuffer.remaining());
-                    p.limit(p.position() + length);
-                    currentBuffer.put(p.slice());
-                    p.position(p.position() + length);
-                    p.limit(limit);
-                }
-
-                if (!currentBuffer.hasRemaining())
-                {
-                    currentBuffer.flip();
-                    currentBufferIndex++;
-                }
-            }
-        }
-        return result;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index a5f23e2..85baddf 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,10 +21,8 @@
 package org.apache.qpid.tests.protocol.v1_0.messaging;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.hamcrest.CoreMatchers.anyOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -294,7 +292,7 @@ public class TransferTest extends ProtocolTestBase
                                                                                    Rejected.REJECTED_SYMBOL)
                                                              .attach().consumeResponse(Attach.class)
                                                              .consumeResponse(Flow.class)
-                                                             .setPayloadOnTransfer(messageEncoder.getPayload())
+                                                             .transferPayload(messageEncoder.getPayload())
                                                              .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                              .transfer()
                                                              .consumeResponse()
@@ -339,7 +337,7 @@ public class TransferTest extends ProtocolTestBase
                                                   .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
                                                   .attach().consumeResponse(Attach.class)
                                                   .consumeResponse(Flow.class)
-                                                  .setPayloadOnTransfer(messageEncoder.getPayload())
+                                                  .transferPayload(messageEncoder.getPayload())
                                                   .transferRcvSettleMode(ReceiverSettleMode.FIRST)
                                                   .transfer()
                                                   .consumeResponse()
@@ -780,6 +778,7 @@ public class TransferTest extends ProtocolTestBase
             interaction.transferDeliveryId(UnsignedInteger.ZERO)
                        .transferDeliveryTag(deliveryTag)
                        .transferPayloadData("test")
+                       .transferSettled(true)
                        .transfer()
                        .sync()
 
@@ -789,41 +788,6 @@ public class TransferTest extends ProtocolTestBase
                        .transfer()
                        .sync();
 
-            boolean firstSettled = false, secondSettled = false;
-            do
-            {
-                interaction.consumeResponse();
-                Response<?> response = interaction.getLatestResponse();
-                assertThat(response, is(notNullValue()));
-
-                Object body = response.getBody();
-
-                if (body instanceof Disposition)
-                {
-                    Disposition disposition = (Disposition) body;
-                    assertThat(disposition.getSettled(), is(equalTo(true)));
-                    assertThat(disposition.getFirst(),
-                               anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE)));
-                    assertThat(disposition.getLast(),
-                               anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE), nullValue()));
-
-                    if (UnsignedInteger.ZERO.equals(disposition.getFirst()))
-                    {
-                        firstSettled = true;
-                    }
-                    if (UnsignedInteger.ONE.equals(disposition.getFirst())
-                        || UnsignedInteger.ONE.equals(disposition.getLast()))
-                    {
-                        secondSettled = true;
-                    }
-                }
-                else if (!(body instanceof Flow))
-                {
-                    fail("Unexpected response " + body);
-                }
-            }
-            while (!firstSettled || !secondSettled);
-
             transport.doCloseConnection();
 
             assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 7168c47..6fe0b55 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -21,7 +21,6 @@
 package org.apache.qpid.tests.protocol.v1_0.transaction;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org