You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/30 16:19:53 UTC
[1/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0]
Add support for Attach with incomplete-unsettled
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 72ed1aa52 -> 737c52807
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
new file mode 100644
index 0000000..e85cbc3
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/ResumeDeliveriesTest.java
@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.transport.link;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.anyOf;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.isIn;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.typeCompatibleWith;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.hamcrest.Matchers;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+
+public class ResumeDeliveriesTest extends ProtocolTestBase
+{
+ private static final int MIN_MAX_FRAME_SIZE = 512;
+ private static final String TEST_MESSAGE_CONTENT = "foo";
+ private InetSocketAddress _brokerAddress;
+ private String _originalMmsMessageStorePersistence;
+
+ @Before
+ public void setUp()
+ {
+ _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+ System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @After
+ public void tearDown()
+ {
+ if (_originalMmsMessageStorePersistence != null)
+ {
+ System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+ }
+ else
+ {
+ System.clearProperty("qpid.tests.mms.messagestore.persistence");
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.13",
+ description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
+ + " attach frame will carry the delivery-tags and delivery state of all deliveries"
+ + " considered unsettled by the issuing link endpoint.")
+ public void resumeSendingLinkSingleUnsettledDelivery() throws Exception
+ {
+ final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+ final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+
+ final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class);
+
+ // 1. attach with ReceiverSettleMode.SECOND
+ interaction.attachHandle(linkHandle1)
+ .attachRole(Role.SENDER)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachTargetAddress(destination)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ // 2. send a unsettled delivery
+ final Disposition responseDisposition = interaction.transferHandle(linkHandle1)
+ .transferDeliveryId(UnsignedInteger.ZERO)
+ .transferDeliveryTag(deliveryTag)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transfer()
+ .consumeResponse()
+ .getLatestResponse(Disposition.class);
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+ final DeliveryState remoteDeliveryState = responseDisposition.getState();
+
+ // 3. detach the link
+ interaction.detach().consumeResponse(Detach.class);
+
+ // 4. resume the link
+ final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+ .attachUnsettled(Collections.singletonMap(deliveryTag, null))
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ // 5. assert content of unsettled map
+ assertThat(responseAttach2.getTarget(), is(notNullValue()));
+ final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+ assertThat(remoteUnsettled, is(notNullValue()));
+ assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+ assertThat(remoteUnsettled.get(deliveryTag).getClass(), typeCompatibleWith(remoteDeliveryState.getClass()));
+ assertThat(responseAttach2.getIncompleteUnsettled(), is(anyOf(nullValue(), equalTo(false))));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.3",
+ description = "If the local unsettled map is too large to be encoded within a frame of the agreed maximum"
+ + " frame size then the session MAY be ended with the frame-size-too-small error. The"
+ + " endpoint SHOULD make use of the ability to send an incomplete unsettled map (see below)"
+ + " to avoid sending an error.")
+ public void resumeSendingLinkWithIncompleteUnsettled() throws Exception
+ {
+ final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse();
+
+ // 0. Open connection with small max-frame-size
+ final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+ interaction.begin().consumeResponse(Begin.class);
+
+ // 1. attach with ReceiverSettleMode.SECOND
+ final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+ interaction.attachHandle(linkHandle1)
+ .attachRole(Role.SENDER)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachTargetAddress(destination)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ // 2. send enough unsettled deliveries to cause incomplete-unsettled to be true
+ // assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
+ interaction.transferHandle(linkHandle1)
+ .transferPayloadData(TEST_MESSAGE_CONTENT);
+ Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+ for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+ {
+ final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+ final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+ .transferDeliveryTag(deliveryTag)
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+ localUnsettled.put(deliveryTag, null);
+ }
+
+ // 3. detach the link
+ interaction.detach().consumeResponse(Detach.class);
+
+ // 4. resume the link
+ final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+ Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+ localUnsettled.get(sampleLocalUnsettled));
+ final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+ .attachUnsettled(unsettled)
+ .attachIncompleteUnsettled(true)
+ .attach().consumeResponse(End.class, Attach.class)
+ .getLatestResponse();
+
+ if (latestResponse.getBody() instanceof End)
+ {
+ // 5.a assert session end error
+ final End responseEnd = (End) latestResponse.getBody();
+ final Error error = responseEnd.getError();
+ assertThat(error, is(notNullValue()));
+ assertThat(error.getCondition().getValue(), is(equalTo(AmqpError.FRAME_SIZE_TOO_SMALL)));
+ }
+ else if (latestResponse.getBody() instanceof Attach)
+ {
+ // 5.b assert content of unsettled map
+ final Attach responseAttach2 = (Attach) latestResponse.getBody();
+ assertThat(responseAttach2.getTarget(), is(notNullValue()));
+ final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+ assertThat(remoteUnsettled, is(notNullValue()));
+ assertThat(remoteUnsettled.keySet(), is(not(empty())));
+ for (Binary deliveryTag : remoteUnsettled.keySet())
+ {
+ assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+ }
+ assertThat(responseAttach2.getIncompleteUnsettled(), is(equalTo(true)));
+ }
+ else
+ {
+ fail(String.format("Unexpected response. Expected End or Attach. Got '%s'.", latestResponse.getBody()));
+ }
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.3", description =
+ "If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
+ + "When the map is incomplete the recipient of the map cannot take the absence of a delivery tag from "
+ + "the map as evidence of settlement. On receipt of an incomplete unsettled map a sending endpoint MUST "
+ + "NOT send any new deliveries (i.e. deliveries where resume is not set to true) to its partner (and "
+ + "a receiving endpoint which sent an incomplete unsettled map MUST detach with an error on "
+ + "receiving a transfer which does not have the resume flag set to true).")
+ public void rejectNewDeliveryWhilstUnsettledIncomplete() throws Exception
+ {
+ final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse();
+
+ // 0. Open connection with small max-frame-size
+ final Open open = interaction.openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+ .open().consumeResponse()
+ .getLatestResponse(Open.class);
+ interaction.begin().consumeResponse(Begin.class);
+
+ // 1. attach with ReceiverSettleMode.SECOND
+ final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+ interaction.attachHandle(linkHandle1)
+ .attachRole(Role.SENDER)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachTargetAddress(destination)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ // 2. send enough unsettled deliverys to cause incomplete-unsettled to be true
+ // assume each delivery requires at least 1 byte, therefore max-frame-size deliveries should be enough
+ interaction.transferHandle(linkHandle1)
+ .transferPayloadData(TEST_MESSAGE_CONTENT);
+ Map<Binary, DeliveryState> localUnsettled = new HashMap<>(open.getMaxFrameSize().intValue());
+ for (int i = 0; i < open.getMaxFrameSize().intValue(); ++i)
+ {
+ final Binary deliveryTag = new Binary(String.valueOf(i).getBytes(StandardCharsets.UTF_8));
+ final Disposition responseDisposition = interaction.transferDeliveryId(UnsignedInteger.valueOf(i))
+ .transferDeliveryTag(deliveryTag)
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+ localUnsettled.put(deliveryTag, null);
+ }
+
+ // 3. detach the link
+ interaction.detach().consumeResponse(Detach.class);
+
+ // 4. resume the link
+ final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+ Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+ localUnsettled.get(sampleLocalUnsettled));
+ final Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+ .attachUnsettled(unsettled)
+ .attachIncompleteUnsettled(true)
+ .attach().consumeResponse(End.class, Attach.class)
+ .getLatestResponse();
+ assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+ // 5. ensure attach has incomplete-unsettled
+ final Attach responseAttach = (Attach) latestResponse.getBody();
+ assertThat(responseAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+ // 6. send new transfer
+ final Binary newDeliveryTag = new Binary("newTransfer".getBytes(StandardCharsets.UTF_8));
+ final Detach detachWithError = interaction.transferHandle(linkHandle2)
+ .transferDeliveryId(UnsignedInteger.ONE)
+ .transferDeliveryTag(newDeliveryTag)
+ .transfer().consumeResponse()
+ .getLatestResponse(Detach.class);
+ assertThat(detachWithError.getError(), is(notNullValue()));
+ final Error detachError = detachWithError.getError();
+ assertThat(detachError.getCondition(), is(equalTo(AmqpError.ILLEGAL_STATE)));
+ }
+ }
+
+ @Ignore("QPID-7845")
+ @Test
+ @SpecificationTest(section = "2.7.3", description =
+ "If set to true [incomplete-unsettled] indicates that the unsettled map provided is not complete. "
+ + "When the map is incomplete the recipient of the map cannot take the absence of a delivery tag from "
+ + "the map as evidence of settlement. On receipt of an incomplete unsettled map a sending endpoint MUST "
+ + "NOT send any new deliveries (i.e. deliveries where resume is not set to true) to its partner (and "
+ + "a receiving endpoint which sent an incomplete unsettled map MUST detach with an error on "
+ + "receiving a transfer which does not have the resume flag set to true).")
+ public void incompleteUnsettledReceiving() throws Exception
+ {
+ for (int i = 0; i < MIN_MAX_FRAME_SIZE; i++)
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT + "-" + i);
+ }
+
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr).connect())
+ {
+ // 1. open with small max-frame=512, begin, attach receiver with
+ // with rcv-settle-mode=second, snd-settle-mode=unsettled,
+ // flow with incoming-window=MAX_INTEGER and link-credit=MAX_INTEGER
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .openMaxFrameSize(UnsignedInteger.valueOf(MIN_MAX_FRAME_SIZE))
+ .open()
+ .consumeResponse()
+ .begin()
+ .consumeResponse(Begin.class)
+ .attachRole(Role.RECEIVER)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach()
+ .consumeResponse(Attach.class);
+
+ Attach attach = interaction.getLatestResponse(Attach.class);
+ assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+ interaction.flowIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+ .flowLinkCredit(UnsignedInteger.valueOf(Integer.MAX_VALUE))
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ // 2. Receive transfers
+ final Map<Binary, DeliveryState> localUnsettled = new HashMap<>();
+ for (int i = 0; i < MIN_MAX_FRAME_SIZE; )
+ {
+ Response<?> response = interaction.consumeResponse().getLatestResponse();
+ if (response.getBody() instanceof Transfer)
+ {
+ assertThat(response.getBody(), Matchers.is(instanceOf(Transfer.class)));
+ Transfer responseTransfer = (Transfer) response.getBody();
+ assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+ assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+ localUnsettled.putIfAbsent(responseTransfer.getDeliveryTag(), responseTransfer.getState());
+ i++;
+ }
+ else if (response.getBody() instanceof Flow || response.getBody() instanceof Disposition)
+ {
+ // ignore
+ }
+ else
+ {
+ fail("Unexpected frame " + response.getBody());
+ }
+ }
+
+ // 3. detach the link
+ interaction.detach().consumeResponse(Detach.class);
+
+ // 4. resume the link
+ final Binary sampleLocalUnsettled = localUnsettled.keySet().iterator().next();
+ Map<Binary, DeliveryState> unsettled = Collections.singletonMap(sampleLocalUnsettled,
+ localUnsettled.get(sampleLocalUnsettled));
+ final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ Response<?> latestResponse = interaction.attachHandle(linkHandle2)
+ .attachUnsettled(unsettled)
+ .attachIncompleteUnsettled(true)
+ .attach().consumeResponse(End.class, Attach.class)
+ .getLatestResponse();
+ assumeThat(latestResponse.getBody(), is(instanceOf(Attach.class)));
+
+ final Attach resumingAttach = (Attach) latestResponse.getBody();
+ final Map<Binary, DeliveryState> remoteUnsettled = resumingAttach.getUnsettled();
+ assertThat(remoteUnsettled, is(notNullValue()));
+ assertThat(remoteUnsettled.keySet(), is(not(empty())));
+ for (Binary deliveryTag : remoteUnsettled.keySet())
+ {
+ assertThat(deliveryTag, isIn(localUnsettled.keySet()));
+ }
+ assertThat(resumingAttach.getIncompleteUnsettled(), is(equalTo(true)));
+
+ interaction.flowHandle(linkHandle2).flow();
+
+ boolean received = false;
+ while (!received)
+ {
+ Response<?> nextResponse = interaction.consumeResponse().getLatestResponse();
+ assertThat(nextResponse, is(notNullValue()));
+
+ if (nextResponse.getBody() instanceof Transfer)
+ {
+ assertThat(nextResponse.getBody(), is(instanceOf(Transfer.class)));
+ Transfer responseTransfer = (Transfer) nextResponse.getBody();
+ assertThat(responseTransfer.getMore(), is(not(equalTo(true))));
+ assertThat(responseTransfer.getSettled(), is(not(equalTo(true))));
+ assertThat(responseTransfer.getDeliveryTag(), is(equalTo(sampleLocalUnsettled)));
+ received = true;
+ }
+ else if (nextResponse.getBody() instanceof Flow || nextResponse.getBody() instanceof Disposition)
+ {
+ // ignore
+ }
+ else
+ {
+ fail("Unexpected frame " + nextResponse.getBody());
+ }
+ }
+
+ transport.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+ + " the unsettled field from the attach frame will carry"
+ + " the delivery-tags and delivery state of all deliveries"
+ + " considered unsettled by the issuing link endpoint.")
+ public void resumeReceivingLinkEmptyUnsettled() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .disposition();
+
+ Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+ assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+ interaction.attachUnsettled(new HashMap<>())
+ .attach()
+ .consumeResponse(Attach.class);
+
+ Attach attach = interaction.getLatestResponse(Attach.class);
+
+ Map<Binary, DeliveryState> unsettled = attach.getUnsettled();
+ assertThat(unsettled.entrySet(), empty());
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+ + " the unsettled field from the attach frame will carry"
+ + " the delivery-tags and delivery state of all deliveries"
+ + " considered unsettled by the issuing link endpoint.")
+ public void resumeReceivingLinkWithSingleUnsettledAccepted() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+ .attach().consumeResponse();
+
+ Attach attach = interaction.getLatestResponse(Attach.class);
+ assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery();
+
+ List<Transfer> transfers = interaction.getLatestDelivery();
+ assertThat(transfers, hasSize(1));
+ Transfer transfer = transfers.get(0);
+
+ Binary deliveryTag = transfer.getDeliveryTag();
+ assertThat(deliveryTag, is(notNullValue()));
+ assertThat(transfer.getSettled(), is(not(equalTo(true))));
+ Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ Detach detach = interaction.detach().consumeResponse().getLatestResponse(Detach.class);
+ assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+ interaction.attachUnsettled(Collections.singletonMap(deliveryTag, new Accepted()))
+ .attach()
+ .consumeResponse(Attach.class);
+
+ Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+ Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+ assertThat(unsettled, is(notNullValue()));
+ assertThat(unsettled.entrySet(), hasSize(1));
+ Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+ assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+ interaction.flowNextIncomingId(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery();
+
+ transfers = interaction.getLatestDelivery();
+ assertThat(transfers, hasSize(1));
+ Transfer resumeTransfer = transfers.get(0);
+
+ assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+ assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+ assertThat(resumeTransfer.getPayload(), is(nullValue()));
+
+ if (!Boolean.TRUE.equals(resumeTransfer.getSettled()))
+ {
+ interaction.dispositionSettled(true)
+ .dispositionState(new Accepted())
+ .dispositionRole(Role.RECEIVER)
+ .disposition();
+ }
+
+ transport.doCloseConnection();
+
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+ is(equalTo(0)));
+ }
+ }
+ }
+
+ @Ignore("QPID-7845")
+ @Test
+ @SpecificationTest(section = "2.6.13", description = "When a suspended link having unsettled deliveries is resumed,"
+ + " the unsettled field from the attach frame will carry"
+ + " the delivery-tags and delivery state of all deliveries"
+ + " considered unsettled by the issuing link endpoint.")
+ public void resumeReceivingLinkOneUnsettledWithNoOutcome() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attachSndSettleMode(SenderSettleMode.UNSETTLED)
+ .attach().consumeResponse();
+
+ Attach attach = interaction.getLatestResponse(Attach.class);
+ assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.UNSETTLED)));
+
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery();
+
+ List<Transfer> transfers = interaction.getLatestDelivery();
+ assertThat(transfers, hasSize(1));
+ Transfer transfer = transfers.get(0);
+
+ Binary deliveryTag = transfer.getDeliveryTag();
+ assertThat(deliveryTag, is(notNullValue()));
+ Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ Detach detach = interaction.detach().consumeResponse(Detach.class).getLatestResponse(Detach.class);
+ assertThat(detach.getClosed(), anyOf(nullValue(), equalTo(false)));
+
+ interaction.attachUnsettled(Collections.singletonMap(deliveryTag, null))
+ .attach()
+ .consumeResponse(Attach.class);
+
+ Attach resumeAttach = interaction.getLatestResponse(Attach.class);
+
+ Map<Binary, DeliveryState> unsettled = resumeAttach.getUnsettled();
+ assertThat(unsettled, is(notNullValue()));
+ assertThat(unsettled.entrySet(), hasSize(1));
+ Map.Entry<Binary, DeliveryState> entry = unsettled.entrySet().iterator().next();
+ assertThat(entry.getKey(), is(equalTo(deliveryTag)));
+
+ interaction.flowNextIncomingId(UnsignedInteger.ONE)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery();
+
+ transfers = interaction.getLatestDelivery();
+ assertThat(transfers, hasSize(1));
+ Transfer resumeTransfer = transfers.get(0);
+
+ assertThat(resumeTransfer.getResume(), is(equalTo(true)));
+ assertThat(resumeTransfer.getDeliveryTag(), is(equalTo(deliveryTag)));
+ assertThat(resumeTransfer.getPayload(), is(notNullValue()));
+
+ interaction.dispositionSettled(true)
+ .dispositionState(new Accepted())
+ .dispositionRole(Role.RECEIVER)
+ .disposition();
+
+ transport.doCloseConnection();
+
+ if (getBrokerAdmin().isQueueDepthSupported())
+ {
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME),
+ Matchers.is(Matchers.equalTo(0)));
+ }
+ }
+ }
+
+ @Ignore("QPID-7845")
+ @Test
+ @SpecificationTest(section = "2.6.13",
+ description = "When a suspended link having unsettled deliveries is resumed, the unsettled field from the"
+ + " attach frame will carry the delivery-tags and delivery state of all deliveries"
+ + " considered unsettled by the issuing link endpoint.")
+ public void resumeSendingLinkSinglePartialDelivery() throws Exception
+ {
+ final String destination = BrokerAdmin.TEST_QUEUE_NAME;
+ final Binary deliveryTag = new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8));
+
+ QpidByteBuffer[] messagePayload = Utils.splitPayload("testData1", 2);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+
+ final UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+ final Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class);
+
+ // 1. attach with ReceiverSettleMode.SECOND
+ interaction.attachHandle(linkHandle1)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(destination)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class);
+
+ // 2. send a partial delivery
+ interaction.transferHandle(linkHandle1)
+ .transferDeliveryId(UnsignedInteger.ZERO)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .transferPayload(Collections.singletonList(messagePayload[0]))
+ .transfer();
+
+ // 3. detach the link
+ interaction.detach().consumeResponse(Detach.class);
+
+ // 4. resume the link
+ final UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ final Attach responseAttach2 = interaction.attachHandle(linkHandle2)
+ .attachUnsettled(Collections.singletonMap(deliveryTag, null))
+ .attach().consumeResponse()
+ .getLatestResponse(Attach.class);
+
+ // 5. assert content of unsettled map
+ assertThat(responseAttach2.getTarget(), is(notNullValue()));
+
+ final Map<Binary, DeliveryState> remoteUnsettled = responseAttach2.getUnsettled();
+ assertThat(remoteUnsettled, is(notNullValue()));
+ assertThat(remoteUnsettled.keySet(), is(equalTo(Collections.singleton(deliveryTag))));
+
+ interaction.transferHandle(linkHandle2)
+ .transferResume(true)
+ .transfer()
+ .sync()
+ .transferMore(false)
+ .transferPayload(Collections.singletonList(messagePayload[1]))
+ .transfer();
+
+ boolean settled = false;
+ do
+ {
+ interaction.consumeResponse();
+ Response<?> response = interaction.getLatestResponse();
+ assertThat(response, is(notNullValue()));
+
+ Object body = response.getBody();
+
+ if (body instanceof Disposition)
+ {
+ Disposition disposition = (Disposition) body;
+ assertThat(disposition.getSettled(), is(Matchers.equalTo(true)));
+ assertThat(disposition.getFirst(), equalTo(UnsignedInteger.ZERO));
+ settled = true;
+ }
+ else if (!(body instanceof Flow))
+ {
+ fail("Unexpected response " + body);
+ }
+ }
+ while (!settled);
+
+ transport.doCloseConnection();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
----------------------------------------------------------------------
diff --git a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
index 3868246..e313222 100644
--- a/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
+++ b/systests/src/test/java/org/apache/qpid/systest/prefetch/ZeroPrefetchTest.java
@@ -63,7 +63,7 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
producer.send(secondMessage);
- Message receivedMessage = prefetch1consumer.receive(2000l);
+ final Message receivedMessage = prefetch1consumer.receive(2000l);
assertNotNull("First message was not received", receivedMessage);
assertEquals("Message property was not as expected", firstPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
@@ -73,9 +73,9 @@ public class ZeroPrefetchTest extends QpidBrokerTestCase
final Session prefetch2session = prefetch2Connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer prefetch2consumer = prefetch2session.createConsumer(queue);
- receivedMessage = prefetch2consumer.receive(2000l);
- assertNotNull("Second message was not received", receivedMessage);
- assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage.getStringProperty(TEST_PROPERTY_NAME));
+ final Message receivedMessage2 = prefetch2consumer.receive(2000l);
+ assertNotNull("Second message was not received", receivedMessage2);
+ assertEquals("Message property was not as expected", secondPropertyValue, receivedMessage2.getStringProperty(TEST_PROPERTY_NAME));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-broker-j git commit: QPID-7649: [Java Broker] [AMQP1.0]
Add support for Attach with incomplete-unsettled
Posted by or...@apache.org.
QPID-7649: [Java Broker] [AMQP1.0] Add support for Attach with incomplete-unsettled
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/737c5280
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/737c5280
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/737c5280
Branch: refs/heads/master
Commit: 737c52807080b2e54fa6f4b419c0086df375e2bc
Parents: 72ed1aa
Author: Alex Rudyy <or...@apache.org>
Authored: Fri Jun 30 17:15:04 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Fri Jun 30 17:15:04 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 82 +-
.../v1_0/AbstractReceivingLinkEndpoint.java | 60 +-
.../protocol/v1_0/SendingLinkEndpoint.java | 3 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 6 +-
.../v1_0/StandardReceivingLinkEndpoint.java | 25 +-
.../protocol/v1_0/framing/FrameHandler.java | 1 +
.../v1_0/framing/OversizeFrameException.java | 1 +
.../protocol/v1_0/type/messaging/Accepted.java | 30 +-
.../protocol/v1_0/type/messaging/Modified.java | 47 +-
.../protocol/v1_0/type/messaging/Received.java | 24 +-
.../protocol/v1_0/type/messaging/Rejected.java | 40 +-
.../protocol/v1_0/type/messaging/Released.java | 29 +-
.../v1_0/type/transaction/Declared.java | 31 +-
.../type/transaction/TransactionalState.java | 25 +-
.../server/protocol/v1_0/Session_1_0Test.java | 10 +
.../tests/protocol/v1_0/FrameTransport.java | 2 +-
.../qpid/tests/protocol/v1_0/Interaction.java | 77 +-
.../apache/qpid/tests/protocol/v1_0/Utils.java | 48 ++
.../v1_0/messaging/MultiTransferTest.java | 91 +--
.../protocol/v1_0/messaging/TransferTest.java | 42 +-
.../transaction/TransactionalTransferTest.java | 1 -
.../transport/link/ResumeDeliveriesTest.java | 766 +++++++++++++++++++
.../qpid/systest/prefetch/ZeroPrefetchTest.java | 8 +-
23 files changed, 1124 insertions(+), 325 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 55432c9..5410766 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -24,6 +24,8 @@ package org.apache.qpid.server.protocol.v1_0;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -31,6 +33,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -38,8 +41,11 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
@@ -49,6 +55,8 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseTarget> implements LinkEndpoint<S, T>
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
+ private static final int FRAME_HEADER_SIZE = 8;
+
private final Link_1_0<S, T> _link;
private final Session_1_0 _session;
@@ -66,6 +74,9 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
private volatile Map<Symbol, Object> _properties;
private volatile State _state = State.ATTACH_RECVD;
+ protected boolean _remoteIncompleteUnsettled;
+ protected boolean _localIncompleteUnsettled;
+
protected enum State
{
DETACHED,
@@ -129,7 +140,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_receivingSettlementMode = attach.getRcvSettleMode();
_properties = initProperties(attach);
_state = State.ATTACH_RECVD;
-
+ _remoteIncompleteUnsettled = Boolean.TRUE.equals(attach.getIncompleteUnsettled());
if (getRole() == Role.RECEIVER)
{
getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
@@ -306,6 +317,8 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
}
+ attachToSend = handleOversizedUnsettledMapIfNecessary(attachToSend);
+
switch (_state)
{
case DETACHED:
@@ -322,6 +335,73 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
+ private Attach handleOversizedUnsettledMapIfNecessary(final Attach attachToSend)
+ {
+ final AMQPDescribedTypeRegistry describedTypeRegistry = getSession().getConnection().getDescribedTypeRegistry();
+ final ValueWriter<Attach> valueWriter = describedTypeRegistry.getValueWriter(attachToSend);
+ if (valueWriter.getEncodedSize() + 8 > getSession().getConnection().getMaxFrameSize())
+ {
+ _localIncompleteUnsettled = true;
+ attachToSend.setIncompleteUnsettled(true);
+ final int targetSize = getSession().getConnection().getMaxFrameSize();
+ int lowIndex = 0;
+ Map<Binary, DeliveryState> localUnsettledMap = attachToSend.getUnsettled();
+ if (localUnsettledMap == null)
+ {
+ localUnsettledMap = Collections.emptyMap();
+ }
+ int highIndex = localUnsettledMap.size();
+ int currentIndex = (highIndex - lowIndex) / 2;
+ int oldIndex;
+ HashMap<Binary, DeliveryState> unsettledMap = null;
+ int totalSize;
+ do
+ {
+ HashMap<Binary, DeliveryState> partialUnsettledMap = new HashMap<>(currentIndex);
+ final Iterator<Map.Entry<Binary, DeliveryState>> iterator = localUnsettledMap.entrySet().iterator();
+ for (int i = 0; i < currentIndex; ++i)
+ {
+ final Map.Entry<Binary, DeliveryState> entry = iterator.next();
+ partialUnsettledMap.put(entry.getKey(), entry.getValue());
+ }
+ attachToSend.setUnsettled(partialUnsettledMap);
+ totalSize = describedTypeRegistry.getValueWriter(attachToSend).getEncodedSize() + FRAME_HEADER_SIZE;
+ if (totalSize > targetSize)
+ {
+ highIndex = currentIndex;
+ }
+ else if (totalSize < targetSize)
+ {
+ lowIndex = currentIndex;
+ unsettledMap = partialUnsettledMap;
+ }
+ else
+ {
+ lowIndex = highIndex = currentIndex;
+ unsettledMap = partialUnsettledMap;
+ }
+
+ oldIndex = currentIndex;
+ currentIndex = lowIndex + (highIndex - lowIndex) / 2;
+ }
+ while (oldIndex != currentIndex);
+
+ if (unsettledMap == null || unsettledMap.isEmpty())
+ {
+ final End endWithError = new End();
+ endWithError.setError(new Error(AmqpError.FRAME_SIZE_TOO_SMALL, "Cannot fit a single unsettled delivery into Attach frame."));
+ getSession().end(endWithError);
+ }
+
+ attachToSend.setUnsettled(unsettledMap);
+ }
+ else
+ {
+ _localIncompleteUnsettled = false;
+ }
+ return attachToSend;
+ }
+
public void detach()
{
detach(null, false);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index e87fffb..0b78622 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -117,16 +117,8 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
{
_unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
}
- else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
- {
- final Error error = new Error(AmqpError.ILLEGAL_STATE,
- String.format("Resumed transfer with delivery tag '%s' is not found.",
- _currentDelivery.getDeliveryTag()));
- close(error);
- return;
- }
- if (_currentDelivery.isAborted())
+ if (_currentDelivery.isAborted() || (_currentDelivery.getResume() && !_unsettled.containsKey(_currentDelivery.getDeliveryTag())))
{
_unsettled.remove(_currentDelivery.getDeliveryTag());
getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
@@ -176,14 +168,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
error = new Error(AmqpError.INVALID_FIELD,
"Transfer \"delivery-tag\" is required for a new delivery.");
}
- else if (_unsettled.containsKey(transfer.getDeliveryTag()))
+ else if (!Boolean.TRUE.equals(transfer.getResume()))
{
- error = new Error(AmqpError.ILLEGAL_STATE,
- String.format("Delivery-tag '%s' is used by another unsettled delivery."
- + " The delivery-tag MUST be unique amongst all deliveries that"
- + " could be considered unsettled by either end of the link.",
- transfer.getDeliveryTag()));
+ if (_unsettled.containsKey(transfer.getDeliveryTag()))
+ {
+ error = new Error(AmqpError.ILLEGAL_STATE,
+ String.format("Delivery-tag '%s' is used by another unsettled delivery."
+ + " The delivery-tag MUST be unique amongst all deliveries that"
+ + " could be considered unsettled by either end of the link.",
+ transfer.getDeliveryTag()));
+ }
+ else if (_localIncompleteUnsettled || _remoteIncompleteUnsettled)
+ {
+ error = new Error(AmqpError.ILLEGAL_STATE,
+ "Cannot accept new deliveries while incomplete-unsettled is true.");
+ }
}
+
return error;
}
@@ -297,20 +298,6 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
sendFlowConditional();
}
- @Override
- public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
- super.receiveDeliveryState(deliveryTag, state, settled);
- if(_creditWindow)
- {
- if(Boolean.TRUE.equals(settled))
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
- }
- }
-
SectionDecoder getSectionDecoder()
{
return _sectionDecoder;
@@ -321,11 +308,11 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
{
super.settle(deliveryTag);
_unsettled.remove(deliveryTag);
- if(_creditWindow)
+ if (_creditWindow)
{
- sendFlowConditional();
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
}
-
}
public void flowStateChanged()
@@ -341,13 +328,10 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
finally
{
- if (close)
+ if (_currentDelivery != null)
{
- if (_currentDelivery != null)
- {
- _currentDelivery.discard();
- _currentDelivery = null;
- }
+ _currentDelivery.discard();
+ _currentDelivery = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index e6763eb..a78f7f0 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -644,11 +644,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
Map<Binary, DeliveryState> remoteUnsettled =
attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
+ final boolean isUnsettledComplete = !Boolean.TRUE.equals(attach.getIncompleteUnsettled());
for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
final MessageInstance queueEntry = entry.getValue().getMessageInstance();
- if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+ if (!remoteUnsettled.containsKey(deliveryTag) && isUnsettledComplete)
{
queueEntry.setRedelivered();
queueEntry.release(oldConsumer);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 65a5265..ba06376 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -311,7 +311,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
try
{
List<QpidByteBuffer> payload = xfr.getPayload();
- final long remaining = QpidByteBufferUtils.remaining(payload);
+ final long remaining = payload == null ? 0 : QpidByteBufferUtils.remaining(payload);
int payloadSent = _connection.sendFrame(_sendingChannel, xfr, payload);
if(payload != null && payloadSent < remaining && payloadSent >= 0)
@@ -320,11 +320,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
Transfer secondTransfer = new Transfer();
- secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
secondTransfer.setHandle(xfr.getHandle());
- secondTransfer.setSettled(xfr.getSettled());
+ secondTransfer.setRcvSettleMode(xfr.getRcvSettleMode());
secondTransfer.setState(xfr.getState());
- secondTransfer.setMessageFormat(xfr.getMessageFormat());
secondTransfer.setPayload(payload);
sendTransfer(secondTransfer, endpoint, false);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 048fa20..447e7cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -45,6 +45,7 @@ import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy;
import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
@@ -270,8 +271,11 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
@Override
protected void remoteDetachedPerformDetach(Detach detach)
{
- if(!TerminusDurability.UNSETTLED_STATE.equals(getDurability()) ||
- (detach != null && Boolean.TRUE.equals(detach.getClosed())))
+ final TerminusExpiryPolicy expiryPolicy = getTarget().getExpiryPolicy();
+ if((detach != null && Boolean.TRUE.equals(detach.getClosed()))
+ || TerminusExpiryPolicy.LINK_DETACH.equals(expiryPolicy)
+ || (TerminusExpiryPolicy.SESSION_END.equals(expiryPolicy) && getSession().isClosing())
+ || (TerminusExpiryPolicy.CONNECTION_CLOSE.equals(expiryPolicy) && getSession().getConnection().isClosing()))
{
close();
}
@@ -332,6 +336,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
target.setCapabilities(targetCapabilities.toArray(new Symbol[targetCapabilities.size()]));
}
+ target.setExpiryPolicy(attachTarget.getExpiryPolicy());
final ReceivingDestination destination = getSession().getReceivingDestination(getLink(), target);
@@ -341,17 +346,19 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
setCapabilities(targetCapabilities);
setDestination(destination);
- Map remoteUnsettled = attach.getUnsettled();
- Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
- for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
+ if (!Boolean.TRUE.equals(attach.getIncompleteUnsettled()))
{
- Binary deliveryTag = entry.getKey();
- if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+ Map remoteUnsettled = attach.getUnsettled();
+ Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+ for (Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
{
- _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
+ Binary deliveryTag = entry.getKey();
+ if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
+ {
+ _unsettled.remove(deliveryTag);
+ }
}
}
-
getLink().setTermini(source, target);
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
index c5e0334..581c3c5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java
@@ -59,6 +59,7 @@ public class FrameHandler implements ProtocolHandler
{
try
{
+ LOGGER.debug("RECV {} bytes", in.remaining());
Error frameParsingError = null;
int size;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
index e4a94bf..a77e587 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/OversizeFrameException.java
@@ -26,6 +26,7 @@ public class OversizeFrameException extends RuntimeException
public OversizeFrameException(final AMQFrame frame, final int size)
{
+ super("Tried to send frame of size: " + String.valueOf(size));
_frame = frame;
_size = size;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
index fb9f446..abf8509 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Accepted.java
@@ -23,31 +23,25 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+public class Accepted implements Outcome
+{
+ public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Accepted
- implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
- {
-
- public static final Symbol ACCEPTED_SYMBOL = Symbol.valueOf("amqp:accepted:list");
-
- @Override
- public Symbol getSymbol()
- {
- return ACCEPTED_SYMBOL;
- }
+ @Override
+ public Symbol getSymbol()
+ {
+ return ACCEPTED_SYMBOL;
+ }
- @Override
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Accepted{");
- final int origLength = builder.length();
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
index 43264a9..41d2fa7 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Modified.java
@@ -24,21 +24,18 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
-
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Modified
- implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
- {
-
+public class Modified implements Outcome
+{
+ public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
- public static final Symbol MODIFIED_SYMBOL = Symbol.valueOf("amqp:modified:list");
-
- @CompositeTypeField
- private Boolean _deliveryFailed;
+ @CompositeTypeField
+ private Boolean _deliveryFailed;
@CompositeTypeField
private Boolean _undeliverableHere;
@@ -76,39 +73,39 @@ public class Modified
_messageAnnotations = messageAnnotations;
}
- @Override
- public Symbol getSymbol()
- {
- return MODIFIED_SYMBOL;
- }
+ @Override
+ public Symbol getSymbol()
+ {
+ return MODIFIED_SYMBOL;
+ }
- @Override
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Modified{");
final int origLength = builder.length();
- if(_deliveryFailed != null)
+ if (_deliveryFailed != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
builder.append("deliveryFailed=").append(_deliveryFailed);
}
- if(_undeliverableHere != null)
+ if (_undeliverableHere != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
builder.append("undeliverableHere=").append(_undeliverableHere);
}
- if(_messageAnnotations != null)
+ if (_messageAnnotations != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
@@ -118,6 +115,4 @@ public class Modified
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
index 3ad8cb7..86d9554 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Received.java
@@ -24,13 +24,13 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Received
- implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState
- {
-
+public class Received implements DeliveryState
+{
@CompositeTypeField(mandatory = true)
private UnsignedInteger _sectionNumber;
@@ -63,18 +63,18 @@ public class Received
StringBuilder builder = new StringBuilder("Received{");
final int origLength = builder.length();
- if(_sectionNumber != null)
+ if (_sectionNumber != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
builder.append("sectionNumber=").append(_sectionNumber);
}
- if(_sectionOffset != null)
+ if (_sectionOffset != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
@@ -84,6 +84,4 @@ public class Received
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
index 5f5ca24..e877a56 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Rejected.java
@@ -24,21 +24,17 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
-
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+public class Rejected implements Outcome
+{
+ public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class Rejected
- implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
- {
-
-
- public static final Symbol REJECTED_SYMBOL = Symbol.valueOf("amqp:rejected:list");
-
- @CompositeTypeField
- private Error _error;
+ @CompositeTypeField
+ private Error _error;
public Error getError()
{
@@ -50,21 +46,21 @@ public class Rejected
_error = error;
}
- @Override
- public Symbol getSymbol()
- {
- return REJECTED_SYMBOL;
- }
+ @Override
+ public Symbol getSymbol()
+ {
+ return REJECTED_SYMBOL;
+ }
- @Override
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Rejected{");
final int origLength = builder.length();
- if(_error != null)
+ if (_error != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
@@ -74,6 +70,4 @@ public class Rejected
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
index 4ba74f4..945c7ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/messaging/Released.java
@@ -24,23 +24,20 @@
package org.apache.qpid.server.protocol.v1_0.type.messaging;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.*;
+public class Released implements Outcome
+{
+ public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
-public class Released
- implements org.apache.qpid.server.protocol.v1_0.type.DeliveryState, Outcome
- {
-
-
- public static final Symbol RELEASED_SYMBOL = Symbol.valueOf("amqp:released:list");
-
- @Override
- public Symbol getSymbol()
- {
- return RELEASED_SYMBOL;
- }
+ @Override
+ public Symbol getSymbol()
+ {
+ return RELEASED_SYMBOL;
+ }
- @Override
+ @Override
public String toString()
{
StringBuilder builder = new StringBuilder("Released{");
@@ -49,6 +46,4 @@ public class Released
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
index 17c4e62..a9b9343 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/Declared.java
@@ -24,22 +24,17 @@
package org.apache.qpid.server.protocol.v1_0.type.transaction;
-
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-public class Declared
- implements DeliveryState, Outcome
- {
-
-
- public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
+public class Declared implements Outcome
+{
+ public static final Symbol DECLARED_SYMBOL = Symbol.valueOf("amqp:declared:list");
- @CompositeTypeField(mandatory = true)
- private Binary _txnId;
+ @CompositeTypeField(mandatory = true)
+ private Binary _txnId;
public Binary getTxnId()
{
@@ -57,9 +52,9 @@ public class Declared
StringBuilder builder = new StringBuilder("Declared{");
final int origLength = builder.length();
- if(_txnId != null)
+ if (_txnId != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
@@ -71,9 +66,9 @@ public class Declared
}
- @Override
- public Symbol getSymbol()
- {
- return DECLARED_SYMBOL;
- }
- }
+ @Override
+ public Symbol getSymbol()
+ {
+ return DECLARED_SYMBOL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
index 1f2d29b..20db2a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transaction/TransactionalState.java
@@ -24,14 +24,13 @@
package org.apache.qpid.server.protocol.v1_0.type.transaction;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.*;
-
-public class TransactionalState
- implements DeliveryState
- {
-
-
+public class TransactionalState implements DeliveryState
+{
@CompositeTypeField(mandatory = true)
private Binary _txnId;
@@ -64,18 +63,18 @@ public class TransactionalState
StringBuilder builder = new StringBuilder("TransactionalState{");
final int origLength = builder.length();
- if(_txnId != null)
+ if (_txnId != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
builder.append("txnId=").append(_txnId);
}
- if(_outcome != null)
+ if (_outcome != null)
{
- if(builder.length() != origLength)
+ if (builder.length() != origLength)
{
builder.append(',');
}
@@ -85,6 +84,4 @@ public class TransactionalState
builder.append('}');
return builder.toString();
}
-
-
- }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
index 4a22d8e..468fe15 100644
--- a/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
+++ b/broker-plugins/amqp-1-0-protocol/src/test/java/org/apache/qpid/server/protocol/v1_0/Session_1_0Test.java
@@ -62,6 +62,7 @@ import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
@@ -79,6 +80,13 @@ import org.apache.qpid.test.utils.QpidTestCase;
public class Session_1_0Test extends QpidTestCase
{
+ private static final AMQPDescribedTypeRegistry DESCRIBED_TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer()
+ .registerExtensionSoleconnLayer();
+
private static final String TOPIC_NAME = "testTopic";
private static final String QUEUE_NAME = "testQueue";
private static final Symbol TOPIC_CAPABILITY = Symbol.getSymbol("topic");
@@ -721,6 +729,8 @@ public class Session_1_0Test extends QpidTestCase
when(connection.getContextValue(Long.class, Session.PRODUCER_AUTH_CACHE_TIMEOUT)).thenReturn(Session.PRODUCER_AUTH_CACHE_TIMEOUT_DEFAULT);
when(connection.getContextValue(Integer.class, Session.PRODUCER_AUTH_CACHE_SIZE)).thenReturn(Session.PRODUCER_AUTH_CACHE_SIZE_DEFAULT);
when(connection.getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).thenReturn(Connection.DEFAULT_MAX_UNCOMMITTED_IN_MEMORY_SIZE);
+ when(connection.getDescribedTypeRegistry()).thenReturn(DESCRIBED_TYPE_REGISTRY);
+ when(connection.getMaxFrameSize()).thenReturn(512);
final ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
when(connection.doOnIOThreadAsync(runnableCaptor.capture())).thenAnswer(new Answer<ListenableFuture<Void>>()
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
index 9aeb4e9..8746941 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java
@@ -62,7 +62,7 @@ public class FrameTransport implements AutoCloseable
public static final long RESPONSE_TIMEOUT = 6000;
private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
- private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(100);
+ private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
private final EventLoopGroup _workerGroup;
private final InetSocketAddress _brokerAddress;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index 008b016..3a6b083 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -228,15 +228,15 @@ public class Interaction
return this;
}
- public Interaction openChannelMax(UnsignedShort channelMax)
+ public Interaction openMaxFrameSize(final UnsignedInteger maxFrameSize)
{
- _open.setChannelMax(channelMax);
+ _open.setMaxFrameSize(maxFrameSize);
return this;
}
- public Interaction openMaxFrameSize(UnsignedInteger maxFrameSize)
+ public Interaction openChannelMax(UnsignedShort channelMax)
{
- _open.setMaxFrameSize(maxFrameSize);
+ _open.setChannelMax(channelMax);
return this;
}
@@ -397,6 +397,18 @@ public class Interaction
return this;
}
+ public Interaction attachUnsettled(final Map<Binary, DeliveryState> unsettled)
+ {
+ _attach.setUnsettled(unsettled);
+ return this;
+ }
+
+ public Interaction attachIncompleteUnsettled(final Boolean incompleteUnsettled)
+ {
+ _attach.setIncompleteUnsettled(incompleteUnsettled);
+ return this;
+ }
+
public Interaction attach() throws Exception
{
sendPerformativeAndChainFuture(_attach, _sessionChannel);
@@ -501,28 +513,33 @@ public class Interaction
return this;
}
+ public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+ {
+ _transfer.setDeliveryId(deliveryId);
+ return this;
+ }
+
public Interaction transferDeliveryTag(final Binary deliveryTag)
{
_transfer.setDeliveryTag(deliveryTag);
return this;
}
- public Interaction transferState(final DeliveryState state)
+ public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
{
- _transfer.setState(state);
+ _transfer.setMessageFormat(messageFormat);
return this;
}
- public Interaction transferTransactionalState(final Binary transactionalId)
+ public Interaction transferSettled(final Boolean settled)
{
- TransactionalState transactionalState = new TransactionalState();
- transactionalState.setTxnId(transactionalId);
- return transferState(transactionalState);
+ _transfer.setSettled(settled);
+ return this;
}
- public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
+ public Interaction transferMore(final Boolean more)
{
- _transfer.setDeliveryId(deliveryId);
+ _transfer.setMore(more);
return this;
}
@@ -532,25 +549,32 @@ public class Interaction
return this;
}
- public Interaction transferMore(final Boolean more)
+ public Interaction transferState(final DeliveryState state)
{
- _transfer.setMore(more);
+ _transfer.setState(state);
return this;
}
- public Interaction transferAborted(final Boolean aborted)
+ public Interaction transferTransactionalState(final Binary transactionalId)
{
- _transfer.setAborted(aborted);
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setTxnId(transactionalId);
+ return transferState(transactionalState);
+ }
+
+ public Interaction transferResume(final Boolean resume)
+ {
+ _transfer.setResume(resume);
return this;
}
- public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
+ public Interaction transferAborted(final Boolean aborted)
{
- _transfer.setMessageFormat(messageFormat);
+ _transfer.setAborted(aborted);
return this;
}
- public Interaction setPayloadOnTransfer(final List<QpidByteBuffer> payload)
+ public Interaction transferPayload(final List<QpidByteBuffer> payload)
{
_transfer.setPayload(payload);
return this;
@@ -558,11 +582,11 @@ public class Interaction
public Interaction transferPayloadData(final Object payload)
{
- setPayloadOnTransfer(_transfer, payload);
+ transferPayload(_transfer, payload);
return this;
}
- private void setPayloadOnTransfer(final Transfer transfer, final Object payload)
+ private void transferPayload(final Transfer transfer, final Object payload)
{
AmqpValue amqpValue = new AmqpValue(payload);
final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
@@ -575,12 +599,6 @@ public class Interaction
}
}
- public Interaction transferSettled(final Boolean settled)
- {
- _transfer.setSettled(settled);
- return this;
- }
-
public Interaction transfer() throws Exception
{
sendPerformativeAndChainFuture(_transfer, _sessionChannel);
@@ -648,7 +666,7 @@ public class Interaction
public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
{
Transfer transfer = createTransactionTransfer(txnState.getHandle());
- setPayloadOnTransfer(transfer, new Declare());
+ transferPayload(transfer, new Declare());
sendPerformativeAndChainFuture(transfer, _sessionChannel);
consumeResponse(Disposition.class);
Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
@@ -668,7 +686,7 @@ public class Interaction
discharge.setFail(failed);
Transfer transfer = createTransactionTransfer(txnState.getHandle());
- setPayloadOnTransfer(transfer, discharge);
+ transferPayload(transfer, discharge);
sendPerformativeAndChainFuture(transfer, _sessionChannel);
Disposition declareTransactionDisposition = null;
@@ -818,6 +836,7 @@ public class Interaction
public Interaction receiveDelivery() throws Exception
{
+ sync();
_latestDelivery = receiveAllTransfers();
return this;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index 446c112..f0373f5 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -25,8 +25,12 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import java.net.InetSocketAddress;
+import java.util.List;
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
@@ -87,4 +91,48 @@ public class Utils
return interaction.getDecodedLatestDelivery();
}
}
+
+ public static QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
+ {
+ MessageEncoder messageEncoder = new MessageEncoder();
+ final Header header = new Header();
+ messageEncoder.setHeader(header);
+ messageEncoder.addData(messageContent);
+ List<QpidByteBuffer> payload = messageEncoder.getPayload();
+ long size = QpidByteBufferUtils.remaining(payload);
+
+ QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
+ int chunkSize = (int) size / numberOfParts;
+ int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
+ for (int i = 0; i < numberOfParts; i++)
+ {
+ result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
+ }
+
+ int currentBufferIndex = 0;
+ for (QpidByteBuffer p : payload)
+ {
+ final int limit = p.limit();
+
+ while (p.hasRemaining())
+ {
+ QpidByteBuffer currentBuffer = result[currentBufferIndex];
+ if (currentBuffer.hasRemaining())
+ {
+ int length = Math.min(p.remaining(), currentBuffer.remaining());
+ p.limit(p.position() + length);
+ currentBuffer.put(p.slice());
+ p.position(p.position() + length);
+ p.limit(limit);
+ }
+
+ if (!currentBuffer.hasRemaining())
+ {
+ currentBuffer.flip();
+ currentBufferIndex++;
+ }
+ }
+ }
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
index 2f83a13..1c09bb6 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -31,7 +31,6 @@ import static org.hamcrest.Matchers.isOneOf;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import org.junit.After;
@@ -39,11 +38,9 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
@@ -57,10 +54,10 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
-import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
import org.apache.qpid.tests.protocol.v1_0.Response;
import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
public class MultiTransferTest extends ProtocolTestBase
{
@@ -99,7 +96,7 @@ public class MultiTransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = splitPayload("testData", 2);
+ QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -114,14 +111,14 @@ public class MultiTransferTest extends ProtocolTestBase
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transferPayload(Collections.singletonList(payloads[0]))
.transferDeliveryId(deliveryId)
.transferDeliveryTag(deliveryTag)
.transferMore(true)
.transfer()
.sync()
.transferMore(false)
- .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+ .transferPayload(Collections.singletonList(payloads[1]))
.transfer()
.consumeResponse()
.getLatestResponse(Disposition.class);
@@ -141,7 +138,7 @@ public class MultiTransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = splitPayload("testData", 4);
+ QpidByteBuffer[] payloads = Utils.splitPayload("testData", 4);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -158,25 +155,25 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliveryId)
.transferDeliveryTag(deliveryTag)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transferPayload(Collections.singletonList(payloads[0]))
.transfer()
.sync()
.transferDeliveryId(deliveryId)
.transferDeliveryTag(null)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+ .transferPayload(Collections.singletonList(payloads[1]))
.transfer()
.sync()
.transferDeliveryId(null)
.transferDeliveryTag(deliveryTag)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(payloads[2]))
+ .transferPayload(Collections.singletonList(payloads[2]))
.transfer()
.sync()
.transferDeliveryId(null)
.transferDeliveryTag(null)
.transferMore(false)
- .setPayloadOnTransfer(Collections.singletonList(payloads[3]))
+ .transferPayload(Collections.singletonList(payloads[3]))
.transfer()
.consumeResponse();
@@ -200,7 +197,7 @@ public class MultiTransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] payloads = splitPayload("testData", 2);
+ QpidByteBuffer[] payloads = Utils.splitPayload("testData", 2);
final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
@@ -215,13 +212,13 @@ public class MultiTransferTest extends ProtocolTestBase
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transferPayload(Collections.singletonList(payloads[0]))
.transferDeliveryId(deliveryId)
.transferDeliveryTag(deliveryTag)
.transferMore(true)
.transfer()
.sync()
- .setPayloadOnTransfer(null)
+ .transferPayload(null)
.transferMore(null)
.transferAborted(true)
.transfer();
@@ -237,8 +234,8 @@ public class MultiTransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
- QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+ QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+ QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
@@ -275,7 +272,7 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliverId1)
.transferDeliveryTag(deliveryTag1)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+ .transferPayload(Collections.singletonList(messagePayload1[0]))
.transfer()
.sync()
@@ -283,7 +280,7 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliveryId2)
.transferDeliveryTag(deliveryTag2)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+ .transferPayload(Collections.singletonList(messagePayload2[0]))
.transfer()
.sync()
@@ -291,7 +288,7 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliverId1)
.transferDeliveryTag(deliveryTag1)
.transferMore(false)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload1[1]))
+ .transferPayload(Collections.singletonList(messagePayload1[1]))
.transfer()
.sync()
@@ -299,7 +296,7 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliveryId2)
.transferDeliveryTag(deliveryTag2)
.transferMore(false)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload2[1]))
+ .transferPayload(Collections.singletonList(messagePayload2[1]))
.transfer()
.sync();
@@ -328,8 +325,8 @@ public class MultiTransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
- QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+ QpidByteBuffer[] messagePayload1 = Utils.splitPayload("testData1", 2);
+ QpidByteBuffer[] messagePayload2 = Utils.splitPayload("testData2", 2);
Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
@@ -352,62 +349,18 @@ public class MultiTransferTest extends ProtocolTestBase
.transferDeliveryId(deliverId1)
.transferDeliveryTag(deliveryTag1)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+ .transferPayload(Collections.singletonList(messagePayload1[0]))
.transfer()
.sync()
.transferDeliveryId(deliveryId2)
.transferDeliveryTag(deliveryTag2)
.transferMore(true)
- .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+ .transferPayload(Collections.singletonList(messagePayload2[0]))
.transfer()
.sync();
interaction.consumeResponse(Detach.class, End.class, Close.class);
}
}
-
- private QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
- {
- MessageEncoder messageEncoder = new MessageEncoder();
- final Header header = new Header();
- messageEncoder.setHeader(header);
- messageEncoder.addData(messageContent);
- List<QpidByteBuffer> payload = messageEncoder.getPayload();
- long size = QpidByteBufferUtils.remaining(payload);
-
- QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
- int chunkSize = (int) size / numberOfParts;
- int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
- for (int i = 0; i < numberOfParts; i++)
- {
- result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
- }
-
- int currentBufferIndex = 0;
- for (QpidByteBuffer p : payload)
- {
- final int limit = p.limit();
-
- while (p.hasRemaining())
- {
- QpidByteBuffer currentBuffer = result[currentBufferIndex];
- if (currentBuffer.hasRemaining())
- {
- int length = Math.min(p.remaining(), currentBuffer.remaining());
- p.limit(p.position() + length);
- currentBuffer.put(p.slice());
- p.position(p.position() + length);
- p.limit(limit);
- }
-
- if (!currentBuffer.hasRemaining())
- {
- currentBuffer.flip();
- currentBufferIndex++;
- }
- }
- }
- return result;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index a5f23e2..85baddf 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -21,10 +21,8 @@
package org.apache.qpid.tests.protocol.v1_0.messaging;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.hamcrest.CoreMatchers.anyOf;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
@@ -294,7 +292,7 @@ public class TransferTest extends ProtocolTestBase
Rejected.REJECTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .setPayloadOnTransfer(messageEncoder.getPayload())
+ .transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
@@ -339,7 +337,7 @@ public class TransferTest extends ProtocolTestBase
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .setPayloadOnTransfer(messageEncoder.getPayload())
+ .transferPayload(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
@@ -780,6 +778,7 @@ public class TransferTest extends ProtocolTestBase
interaction.transferDeliveryId(UnsignedInteger.ZERO)
.transferDeliveryTag(deliveryTag)
.transferPayloadData("test")
+ .transferSettled(true)
.transfer()
.sync()
@@ -789,41 +788,6 @@ public class TransferTest extends ProtocolTestBase
.transfer()
.sync();
- boolean firstSettled = false, secondSettled = false;
- do
- {
- interaction.consumeResponse();
- Response<?> response = interaction.getLatestResponse();
- assertThat(response, is(notNullValue()));
-
- Object body = response.getBody();
-
- if (body instanceof Disposition)
- {
- Disposition disposition = (Disposition) body;
- assertThat(disposition.getSettled(), is(equalTo(true)));
- assertThat(disposition.getFirst(),
- anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE)));
- assertThat(disposition.getLast(),
- anyOf(equalTo(UnsignedInteger.ZERO), equalTo(UnsignedInteger.ONE), nullValue()));
-
- if (UnsignedInteger.ZERO.equals(disposition.getFirst()))
- {
- firstSettled = true;
- }
- if (UnsignedInteger.ONE.equals(disposition.getFirst())
- || UnsignedInteger.ONE.equals(disposition.getLast()))
- {
- secondSettled = true;
- }
- }
- else if (!(body instanceof Flow))
- {
- fail("Unexpected response " + body);
- }
- }
- while (!firstSettled || !secondSettled);
-
transport.doCloseConnection();
assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/737c5280/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
index 7168c47..6fe0b55 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -21,7 +21,6 @@
package org.apache.qpid.tests.protocol.v1_0.transaction;
import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org