You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/28 15:47:57 UTC
[1/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor
transfer functionality
Repository: qpid-broker-j
Updated Branches:
refs/heads/master d604344c2 -> 2e8efc0a9
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
new file mode 100644
index 0000000..7168c47
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/TransactionalTransferTest.java
@@ -0,0 +1,499 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0.transaction;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.InteractionTransactionalState;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+import org.apache.qpid.tests.protocol.v1_0.Utils;
+
+public class TransactionalTransferTest extends ProtocolTestBase
+{
+ private static final String TEST_MESSAGE_CONTENT = "testMessageContent";
+ private InetSocketAddress _brokerAddress;
+
+ @Before
+ public void setUp()
+ {
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.4",
+ description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+ + " transfer with a transaction, it MUST set the state of the transfer with a"
+ + "transactional-state carrying the appropriate transaction identifier.")
+ public void sendTransactionalPostingReceiverSettlesFirst() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ Disposition responseDisposition = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+ interaction.txnDischarge(txnState, false);
+
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.4",
+ description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+ + " transfer with a transaction, it MUST set the state of the transfer with a"
+ + "transactional-state carrying the appropriate transaction identifier.")
+ public void sendTransactionalPostingDischargeFail() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ Disposition responseDisposition = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.TRUE));
+ assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+ interaction.txnDischarge(txnState, true);
+
+ assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.4",
+ description = "Transactional Posting[...]the transaction controller wishes to associate an outgoing"
+ + " transfer with a transaction, it MUST set the state of the transfer with a"
+ + "transactional-state carrying the appropriate transaction identifier.")
+ public void sendTransactionalPostingReceiverSettlesSecond() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final UnsignedInteger linkHandle = UnsignedInteger.ONE;
+
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ Disposition responseDisposition = interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachHandle(linkHandle)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle)
+ .transferPayloadData(TEST_MESSAGE_CONTENT)
+ .transferTransactionalState(txnState.getCurrentTransactionId())
+ .transfer()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(responseDisposition.getRole(), is(Role.RECEIVER));
+ assertThat(responseDisposition.getSettled(), is(Boolean.FALSE));
+ assertThat(responseDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) responseDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+ interaction.dispositionRole(Role.SENDER)
+ .dispositionSettled(true)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition();
+
+ interaction.txnDischarge(txnState, false);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+ + "wish to associate the outcome of a delivery with a transaction.")
+ public void receiveTransactionalRetirementReceiverSettleFirst() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition()
+ .txnDischarge(txnState, false);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+ + "wish to associate the outcome of a delivery with a transaction.")
+ public void receiveTransactionalRetirementDischargeFail() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition()
+ .txnDischarge(txnState, true);
+
+ Object receivedMessage = Utils.receiveMessage(_brokerAddress, BrokerAdmin.TEST_QUEUE_NAME);
+ assertThat(receivedMessage, is(equalTo(TEST_MESSAGE_CONTENT)));
+ }
+ }
+
+ @Ignore("TODO disposition is currently not being sent by Broker")
+ @Test
+ @SpecificationTest(section = "4.4.2", description = "Transactional Retirement[...] The transaction controller might"
+ + "wish to associate the outcome of a delivery with a transaction.")
+ public void receiveTransactionalRetirementReceiverSettleSecond() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ Disposition settledDisposition = interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(),
+ new Accepted())
+ .disposition()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+
+ assertThat(settledDisposition.getSettled(), is(true));
+ assertThat(settledDisposition.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) settledDisposition.getState()).getOutcome(), is(instanceOf(Accepted.class)));
+
+ interaction.txnDischarge(txnState, false);
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.2", description = "Transactional Acquisition[...]In the case of the flow frame,"
+ + " the transactional work is not necessarily directly"
+ + " initiated or entirely determined when the flow frame"
+ + " arrives at the resource, but can in fact occur at some "
+ + " later point and in ways not necessarily"
+ + " anticipated by the controller.")
+ public void receiveTransactionalAcquisitionReceiverSettleFirst() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+ .flow()
+
+ .receiveDelivery();
+
+ List<Transfer> transfers = interaction.getLatestDelivery();
+ assertThat(transfers.size(), is(equalTo(1)));
+ Transfer transfer = transfers.get(0);
+ assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+ Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition()
+ .txnDischarge(txnState, false);
+
+ assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(0)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "4.4.2", description = "Transactional Acquisition[...]In the case of the flow frame,"
+ + " the transactional work is not necessarily directly"
+ + " initiated or entirely determined when the flow frame"
+ + " arrives at the resource, but can in fact occur at some "
+ + " later point and in ways not necessarily"
+ + " anticipated by the controller.")
+ public void receiveTransactionalAcquisitionDischargeFail() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_CONTENT);
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction();
+ final InteractionTransactionalState txnState = interaction.createTransactionalState(UnsignedInteger.ZERO);
+ interaction.negotiateProtocol()
+ .consumeResponse()
+ .open()
+ .consumeResponse(Open.class)
+ .begin()
+ .consumeResponse(Begin.class)
+
+ .txnAttachCoordinatorLink(txnState)
+ .txnDeclare(txnState)
+
+ .attachRole(Role.RECEIVER)
+ .attachHandle(UnsignedInteger.ONE)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach()
+ .consumeResponse(Attach.class)
+
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flowProperties(Collections.singletonMap(Symbol.valueOf("txn-id"), txnState.getCurrentTransactionId()))
+ .flow()
+
+ .receiveDelivery();
+
+ List<Transfer> transfers = interaction.getLatestDelivery();
+ assertThat(transfers.size(), is(equalTo(1)));
+ Transfer transfer = transfers.get(0);
+ assertThat(transfer.getState(), is(instanceOf(TransactionalState.class)));
+ assertThat(((TransactionalState) transfer.getState()).getTxnId(), is(equalTo(txnState.getCurrentTransactionId())));
+
+ Object data = interaction.decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, is(equalTo(TEST_MESSAGE_CONTENT)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionTransactionalState(txnState.getCurrentTransactionId(), new Accepted())
+ .disposition()
+ .txnDischarge(txnState, true);
+
+ assumeThat(getBrokerAdmin().isQueueDepthSupported(), is(true));
+ assertThat(getBrokerAdmin().getQueueDepthMessages(BrokerAdmin.TEST_QUEUE_NAME), is(equalTo(1)));
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor
transfer functionality
Posted by or...@apache.org.
QPID-7842 : [AMQP 1.0] Refactor transfer functionality
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2e8efc0a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2e8efc0a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2e8efc0a
Branch: refs/heads/master
Commit: 2e8efc0a9cdc25bdd52589866f13f3e1b99bb8f0
Parents: d604344
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jun 22 15:25:01 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 28 16:14:05 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 62 ++-
.../v1_0/AbstractReceivingLinkEndpoint.java | 256 ++++++----
.../protocol/v1_0/ConsumerTarget_1_0.java | 6 +-
.../qpid/server/protocol/v1_0/Delivery.java | 144 +++++-
.../protocol/v1_0/ErrantLinkEndpoint.java | 12 +-
.../qpid/server/protocol/v1_0/LinkEndpoint.java | 3 +-
.../protocol/v1_0/SendingLinkEndpoint.java | 197 ++++----
.../qpid/server/protocol/v1_0/Session_1_0.java | 155 +++---
.../v1_0/StandardReceivingLinkEndpoint.java | 162 +-----
.../TxnCoordinatorReceivingLinkEndpoint.java | 60 +--
.../protocol/v1_0/codec/ValueHandler.java | 2 +-
.../v1_0/delivery/DeliveryRegistry.java | 34 ++
.../v1_0/delivery/DeliveryRegistryImpl.java | 79 +++
.../v1_0/delivery/UnsettledDelivery.java | 46 ++
.../qpid/server/protocol/v1_0/type/Outcome.java | 2 +-
.../protocol/v1_0/type/transport/Attach.java | 12 +-
.../protocol/v1_0/type/transport/Begin.java | 6 +-
.../v1_0/type/transport/Disposition.java | 7 +
.../protocol/v1_0/type/transport/Flow.java | 7 +-
.../qpid/tests/protocol/v1_0/BrokerAdmin.java | 4 +
.../v1_0/EmbeddedBrokerPerClassAdminImpl.java | 14 +
.../v1_0/ExternalQpidBrokerAdminImpl.java | 12 +
.../qpid/tests/protocol/v1_0/Interaction.java | 261 +++++++++-
.../v1_0/InteractionTransactionalState.java | 50 ++
.../apache/qpid/tests/protocol/v1_0/Utils.java | 17 +-
.../v1_0/messaging/MultiTransferTest.java | 413 +++++++++++++++
.../protocol/v1_0/messaging/TransferTest.java | 345 ++++++++++++-
.../v1_0/transaction/DischargeTest.java | 2 +-
.../transaction/TransactionalTransferTest.java | 499 +++++++++++++++++++
29 files changed, 2272 insertions(+), 597 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 00f8724..a02a297 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -52,24 +51,22 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
private final Link_1_0<S, T> _link;
private final Session_1_0 _session;
+
+ // todo: remove client specific part
private Object _flowTransactionId;
- private SenderSettleMode _sendingSettlementMode;
- private ReceiverSettleMode _receivingSettlementMode;
- private Map _initialUnsettledMap;
- private UnsignedInteger _lastSentCreditLimit;
+ private volatile SenderSettleMode _sendingSettlementMode;
+ private volatile ReceiverSettleMode _receivingSettlementMode;
+ private volatile UnsignedInteger _lastSentCreditLimit;
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
- private Symbol[] _capabilities;
- private SequenceNumber _deliveryCount;
- private UnsignedInteger _linkCredit;
- private UnsignedInteger _available;
- private Boolean _drain;
- private UnsignedInteger _localHandle;
- private UnsignedLong _maxMessageSize;
- private Map<Symbol, Object> _properties;
-
- protected volatile State _state = State.ATTACH_RECVD;
- protected Map _localUnsettled;
+ private volatile Symbol[] _capabilities;
+ private volatile SequenceNumber _deliveryCount;
+ private volatile UnsignedInteger _linkCredit;
+ private volatile UnsignedInteger _available;
+ private volatile Boolean _drain;
+ private volatile UnsignedInteger _localHandle;
+ private volatile Map<Symbol, Object> _properties;
+ private volatile State _state = State.ATTACH_RECVD;
protected enum State
{
@@ -88,12 +85,13 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_link = link;
}
- protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+ protected abstract void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
protected abstract void remoteDetachedPerformDetach(final Detach detach);
protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+ protected abstract Map<Binary, DeliveryState> getLocalUnsettled();
@Override
public void receiveAttach(final Attach attach) throws AmqpErrorException
@@ -131,9 +129,17 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
{
_sendingSettlementMode = attach.getSndSettleMode();
_receivingSettlementMode = attach.getRcvSettleMode();
- _initialUnsettledMap = attach.getUnsettled();
_properties = initProperties(attach);
_state = State.ATTACH_RECVD;
+
+ if (getRole() == Role.RECEIVER)
+ {
+ getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+ }
+ else
+ {
+ getSession().getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+ }
}
public boolean isStopped()
@@ -229,20 +235,16 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
}
- public void addUnsettled(final Delivery unsettled)
- {
- }
-
@Override
- public void receiveDeliveryState(final Delivery unsettled,
+ public void receiveDeliveryState(final Binary deliveryTag,
final DeliveryState state,
final Boolean settled)
{
- handle(unsettled.getDeliveryTag(), state, settled);
+ handleDeliveryState(deliveryTag, state, settled);
if (Boolean.TRUE.equals(settled))
{
- settle(unsettled.getDeliveryTag());
+ settle(deliveryTag);
}
}
@@ -297,7 +299,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
attachToSend.setTarget(getTarget());
attachToSend.setSndSettleMode(getSendingSettlementMode());
attachToSend.setRcvSettleMode(getReceivingSettlementMode());
- attachToSend.setUnsettled(_localUnsettled);
+ attachToSend.setUnsettled(getLocalUnsettled());
attachToSend.setProperties(_properties);
attachToSend.setOfferedCapabilities(_capabilities);
@@ -496,13 +498,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
}
- public Map getInitialUnsettledMap()
- {
- return _initialUnsettledMap;
- }
-
- public abstract void initialiseUnsettled();
-
@Override public String toString()
{
return "LinkEndpoint{" +
@@ -517,7 +512,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
", _available=" + _available +
", _drain=" + _drain +
", _localHandle=" + _localHandle +
- ", _maxMessageSize=" + _maxMessageSize +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 4b55186..d159de5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -35,48 +36,21 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extends AbstractLinkEndpoint<Source, T>
{
private final SectionDecoder _sectionDecoder;
- private UnsignedInteger _lastDeliveryId;
- private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>();
- private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>();
- private boolean _creditWindow;
-
-
- private static class TransientState
- {
-
- UnsignedInteger _deliveryId;
- boolean _settled;
-
- private TransientState(final UnsignedInteger transferId)
- {
- _deliveryId = transferId;
- }
-
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
- }
-
- public boolean isSettled()
- {
- return _settled;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
- }
+ final Map<Binary, DeliveryState> _unsettled = Collections.synchronizedMap(new LinkedHashMap<>());
+ private volatile boolean _creditWindow;
+ private volatile Delivery _currentDelivery;
public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, T> link)
{
@@ -98,73 +72,162 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
return Role.RECEIVER;
}
- Error receiveTransfer(final Transfer transfer, final Delivery delivery)
+ void receiveTransfer(final Transfer transfer)
{
if(isAttached())
{
- TransientState transientState;
- final Binary deliveryTag = delivery.getDeliveryTag();
- boolean existingState = _unsettledMap.containsKey(deliveryTag);
- if (!existingState || transfer.getState() != null)
+ if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+ && ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
{
- _unsettledMap.put(deliveryTag, transfer.getState());
+ Error error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+ close(error);
+ return;
}
- if (!existingState)
+
+ if (_currentDelivery == null)
{
- transientState = new TransientState(transfer.getDeliveryId());
- if (delivery.isSettled())
+ Error error = validateNewTransfer(transfer);
+ if (error != null)
{
- transientState.setSettled(true);
+ close(error);
+ return;
}
- _unsettledIds.put(deliveryTag, transientState);
+ _currentDelivery = new Delivery(transfer, this);
+
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
getDeliveryCount().incr();
+
+ getSession().getIncomingDeliveryRegistry()
+ .addDelivery(transfer.getDeliveryId(),
+ new UnsettledDelivery(transfer.getDeliveryTag(), this));
}
else
{
- transientState = _unsettledIds.get(deliveryTag);
- if (delivery.isSettled())
+ Error error = validateSubsequentTransfer(transfer);
+ if (error != null)
{
- transientState.setSettled(true);
+ close(error);
+ return;
}
+ _currentDelivery.addTransfer(transfer);
+ }
+
+ if (!_currentDelivery.getResume())
+ {
+ _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
+ }
+ else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
+ {
+ final Error error = new Error(AmqpError.ILLEGAL_STATE,
+ String.format("Resumed transfer with delivery tag '%s' is not found.",
+ _currentDelivery.getDeliveryTag()));
+ close(error);
+ return;
}
- if (transientState.isSettled() && delivery.isComplete())
+ if (_currentDelivery.isAborted())
{
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(_currentDelivery.getDeliveryTag());
+ getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+ _currentDelivery = null;
+
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ getDeliveryCount().decr();
+ }
+ else if (_currentDelivery.isComplete())
+ {
+ try
+ {
+ if (_currentDelivery.isSettled())
+ {
+ _unsettled.remove(_currentDelivery.getDeliveryTag());
+ getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+ }
+ Error error = receiveDelivery(_currentDelivery);
+ if (error != null)
+ {
+ close(error);
+ }
+ }
+ finally
+ {
+ _currentDelivery = null;
+ }
}
- return messageTransfer(transfer);
}
else
{
+ // TODO: it is wrong
getSession().updateDisposition(Role.RECEIVER, transfer.getDeliveryId(), transfer.getDeliveryId(),null, true);
- return null;
}
}
- protected abstract Error messageTransfer(final Transfer transfer);
-
- @Override public void receiveFlow(final Flow flow)
+ private Error validateNewTransfer(final Transfer transfer)
{
- setAvailable(flow.getAvailable());
- setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+ Error error = null;
+ if (transfer.getDeliveryId() == null)
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"delivery-id\" is required for a new delivery.");
+ }
+ else if (transfer.getDeliveryTag() == null)
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"delivery-tag\" is required for a new delivery.");
+ }
+ return error;
}
- public boolean settled(final Binary deliveryTag)
+ private Error validateSubsequentTransfer(final Transfer transfer)
{
- boolean deleted;
- if (deleted = (_unsettledIds.remove(deliveryTag) != null))
+ Error error = null;
+ if (transfer.getDeliveryId() != null && !_currentDelivery.getDeliveryId()
+ .equals(transfer.getDeliveryId()))
{
- _unsettledMap.remove(deliveryTag);
-
+ error = new Error(AmqpError.INVALID_FIELD,
+ String.format(
+ "Unexpected transfer \"delivery-id\" for multi-transfer delivery: found '%s', expected '%s'.",
+ transfer.getDeliveryId(),
+ _currentDelivery.getDeliveryId()));
}
+ else if (transfer.getDeliveryTag() != null && !_currentDelivery.getDeliveryTag()
+ .equals(transfer.getDeliveryTag()))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ String.format(
+ "Unexpected transfer \"delivery-tag\" for multi-transfer delivery: found '%s', expected '%s'.",
+ transfer.getDeliveryTag(),
+ _currentDelivery.getDeliveryTag()));
+ }
+ else if (_currentDelivery.getReceiverSettleMode() != null && transfer.getRcvSettleMode() != null
+ && !_currentDelivery.getReceiverSettleMode().equals(transfer.getRcvSettleMode()))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
+ }
+ return error;
+ }
+
+ protected abstract Error receiveDelivery(final Delivery delivery);
- return deleted;
+ @Override
+ public void receiveFlow(final Flow flow)
+ {
+ setAvailable(flow.getAvailable());
+ setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+ }
+
+ private boolean settled(final Binary deliveryTag)
+ {
+ return _unsettled.remove(deliveryTag) != null;
}
- public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+ public void updateDisposition(final Binary deliveryTag,
+ final DeliveryState state,
+ final boolean settled)
{
- if (_unsettledMap.containsKey(deliveryTag))
+ if (_unsettled.containsKey(deliveryTag))
{
boolean outcomeUpdate = false;
Outcome outcome = null;
@@ -174,27 +237,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
else if (state instanceof TransactionalState)
{
- // TODO? Is this correct
outcome = ((TransactionalState) state).getOutcome();
}
if (outcome != null)
{
- Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
- outcomeUpdate = !outcome.equals(oldOutcome);
+ if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+ {
+ Object oldOutcome = _unsettled.put(deliveryTag, outcome);
+ outcomeUpdate = !outcome.equals(oldOutcome);
+ }
}
-
- TransientState transientState = _unsettledIds.get(deliveryTag);
if (outcomeUpdate || settled)
{
-
- final UnsignedInteger transferId = transientState.getDeliveryId();
-
- getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
+ getSession().updateDisposition(getRole(), deliveryTag, state, settled);
}
-
if (settled)
{
@@ -212,33 +271,28 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
}
}
- else
+ else if (_creditWindow)
{
- TransientState transientState = _unsettledIds.get(deliveryTag);
- if (_creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
-
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
}
}
- public void setCreditWindow()
+ void setCreditWindow()
{
setCreditWindow(true);
}
- public void setCreditWindow(boolean window)
+ private void setCreditWindow(boolean window)
{
_creditWindow = window;
sendFlowConditional();
}
@Override
- public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+ public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- super.receiveDeliveryState(unsettled, state, settled);
+ super.receiveDeliveryState(deliveryTag, state, settled);
if(_creditWindow)
{
if(Boolean.TRUE.equals(settled))
@@ -258,8 +312,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
public void settle(Binary deliveryTag)
{
super.settle(deliveryTag);
- _unsettledIds.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(deliveryTag);
if(_creditWindow)
{
sendFlowConditional();
@@ -271,15 +324,32 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
{
}
- UnsignedInteger getLastDeliveryId()
+ @Override
+ protected void detach(final Error error, final boolean close)
{
- return _lastDeliveryId;
+ try
+ {
+ super.detach(error, close);
+ }
+ finally
+ {
+ if (close)
+ {
+ if (_currentDelivery != null)
+ {
+ _currentDelivery.discard();
+ _currentDelivery = null;
+ }
+ }
+ }
}
- void setLastDeliveryId(UnsignedInteger lastDeliveryId)
+ @Override
+ protected void handleDeliveryState(Binary deliveryTag, DeliveryState state, Boolean settled)
{
- _lastDeliveryId = lastDeliveryId;
+ if(Boolean.TRUE.equals(settled))
+ {
+ _unsettled.remove(deliveryTag);
+ }
}
-
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index a535fd2..3942ca8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -79,7 +79,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
- private SendingLinkEndpoint _linkEndpoint;
+ private final SendingLinkEndpoint _linkEndpoint;
private final SectionEncoder _sectionEncoder;
private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
@@ -356,7 +356,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
updateNotifyWorkDesired();
- if (isSuspended() && getEndpoint() != null)
+ if (_linkEndpoint != null)
{
_transactionId = _linkEndpoint.getTransactionId();
}
@@ -454,7 +454,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
else
{
- _linkEndpoint.updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+ _linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
}
_linkEndpoint.sendFlowConditional();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index a5fb4bc..8facf8f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -19,38 +19,57 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Delivery
{
private final UnsignedInteger _deliveryId;
private final Binary _deliveryTag;
+ private final List<Transfer> _transfers = new CopyOnWriteArrayList<>();
private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> _linkEndpoint;
- private boolean _complete;
- private boolean _settled;
- private int _numberOfTransfers = 0;
+ private final UnsignedInteger _messageFormat;
+ private volatile boolean _complete;
+ private volatile boolean _settled;
+ private volatile boolean _aborted;
+ private volatile DeliveryState _state;
+ private volatile ReceiverSettleMode _receiverSettleMode;
+ private volatile boolean _resume;
public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
{
- _settled = Boolean.TRUE.equals(transfer.getSettled());
_deliveryId = transfer.getDeliveryId();
_deliveryTag = transfer.getDeliveryTag();
_linkEndpoint = endpoint;
+ _messageFormat = transfer.getMessageFormat();
addTransfer(transfer);
}
- public boolean isComplete()
+ public UnsignedInteger getDeliveryId()
{
- return _complete;
+ return _deliveryId;
+ }
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
}
- public void setComplete(final boolean complete)
+ public boolean isComplete()
{
- _complete = complete;
+ return _complete;
}
public boolean isSettled()
@@ -58,27 +77,93 @@ public class Delivery
return _settled;
}
- public void setSettled(final boolean settled)
+ public boolean isAborted()
{
- _settled = settled;
+ return _aborted;
}
- public final void addTransfer(Transfer transfer)
+ public DeliveryState getState()
{
- _numberOfTransfers++;
- if(Boolean.TRUE.equals(transfer.getAborted()) || !Boolean.TRUE.equals(transfer.getMore()))
+ return _state;
+ }
+
+ public ReceiverSettleMode getReceiverSettleMode()
+ {
+ return _receiverSettleMode;
+ }
+
+ public UnsignedInteger getMessageFormat()
+ {
+ return _messageFormat;
+ }
+
+
+ public boolean getResume()
+ {
+ return _resume;
+ }
+
+ final void addTransfer(Transfer transfer)
+ {
+ if (_aborted)
+ {
+ throw new IllegalStateException(String.format("Delivery '%s/%d' is already aborted",
+ _deliveryTag,
+ _deliveryId.intValue()));
+ }
+
+ if (_complete)
+ {
+ throw new IllegalStateException(String.format("Delivery '%s/%d' is already completed",
+ _deliveryTag,
+ _deliveryId.intValue()));
+ }
+
+ _transfers.add(transfer);
+ if (Boolean.TRUE.equals(transfer.getAborted()))
{
- setComplete(true);
+ _aborted = true;
+ discard();
+ }
+ if(!Boolean.TRUE.equals(transfer.getMore()))
+ {
+ _complete = true;
}
if(Boolean.TRUE.equals(transfer.getSettled()))
{
- setSettled(true);
+ _settled = true;
}
- }
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
+ if(Boolean.TRUE.equals(transfer.getResume()))
+ {
+ _resume = true;
+ }
+
+ if (transfer.getState() != null)
+ {
+ DeliveryState currentState;
+ if (_state instanceof TransactionalState)
+ {
+ currentState = ((TransactionalState) _state).getOutcome();
+ }
+ else
+ {
+ currentState = _state;
+ }
+ if (!(currentState instanceof Outcome))
+ {
+ _state = transfer.getState();
+ }
+ }
+
+ if (transfer.getRcvSettleMode() != null)
+ {
+ if (_receiverSettleMode == null)
+ {
+ _receiverSettleMode = transfer.getRcvSettleMode();
+ }
+
+ }
}
public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> getLinkEndpoint()
@@ -86,13 +171,26 @@ public class Delivery
return _linkEndpoint;
}
- public Binary getDeliveryTag()
+
+ public List<QpidByteBuffer> getPayload()
{
- return _deliveryTag;
+ List<QpidByteBuffer> fragments = new ArrayList<>(_transfers.size());
+ for (Transfer t : _transfers)
+ {
+ fragments.addAll(t.getPayload());
+ t.dispose();
+ }
+ _transfers.clear();
+ return fragments;
}
- public int getNumberOfTransfers()
+ public void discard()
{
- return _numberOfTransfers;
+ for (Transfer transfer: _transfers)
+ {
+ transfer.dispose();
+ }
+ _transfers.clear();
}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index 0d9daa7..287f038 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -144,6 +145,12 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
}
@Override
+ public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+
+ }
+
+ @Override
public void receiveFlow(final Flow flow)
{
throw new UnsupportedOperationException("This Link is errant");
@@ -173,9 +180,4 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
throw new UnsupportedOperationException("This Link is errant");
}
- @Override
- public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
- {
- throw new UnsupportedOperationException("This Link is errant");
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index 61a0199..94e3b1e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -52,7 +53,7 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
void remoteDetached(Detach detach);
- void receiveDeliveryState(Delivery unsettled,
+ void receiveDeliveryState(Binary deliveryTag,
DeliveryState state,
Boolean settled);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 78b6290..95dc34b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +53,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
@@ -80,23 +78,20 @@ import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
+ private static final Symbol PRIORITY = Symbol.valueOf("priority");
- public static final Symbol PRIORITY = Symbol.valueOf("priority");
- private UnsignedInteger _lastDeliveryId;
- private Binary _lastDeliveryTag;
- private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
- private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
- private Binary _transactionId;
- private Integer _priority;
private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+ private final Map<Binary, OutgoingDelivery> _unsettled = new ConcurrentHashMap<>();
+
+ private volatile Binary _transactionId;
+ private volatile Integer _priority;
private volatile boolean _draining = false;
- private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
- private SendingDestination _destination;
- private EnumSet<ConsumerOption> _consumerOptions;
- private FilterManager _consumerFilters;
- private ConsumerTarget_1_0 _consumerTarget;
- private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
+ private volatile SendingDestination _destination;
+ private volatile EnumSet<ConsumerOption> _consumerOptions;
+ private volatile FilterManager _consumerFilters;
+ private volatile ConsumerTarget_1_0 _consumerTarget;
+ private volatile MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
{
@@ -111,7 +106,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
}
- public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
+ private void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
{
// TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
_destination = destination;
@@ -199,7 +194,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
_consumerFilters = filters;
}
- void createConsumerTarget() throws AmqpErrorException
+ private void createConsumerTarget() throws AmqpErrorException
{
final Source source = getSource();
_consumerTarget = new ConsumerTarget_1_0(this,
@@ -338,7 +333,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
attachReceived(attach);
- initialiseUnsettled();
}
@Override
@@ -372,17 +366,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
return Role.SENDER;
}
- public Integer getPriority()
+ private Integer getPriority()
{
return _priority;
}
- public TerminusDurability getTerminusDurability()
- {
- return getSource().getDurable();
- }
-
- public boolean transfer(final Transfer xfr, final boolean decrementCredit)
+ void transfer(final Transfer xfr, final boolean decrementCredit)
{
Session_1_0 s = getSession();
xfr.setMessageFormat(UnsignedInteger.ZERO);
@@ -395,27 +384,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
xfr.setHandle(getLocalHandle());
- s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
-
- if(!Boolean.TRUE.equals(xfr.getSettled()))
- {
- _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
- }
-
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- _lastDeliveryTag = xfr.getDeliveryTag();
- }
- else
- {
- _lastDeliveryTag = null;
- }
-
- return true;
+ s.sendTransfer(xfr, this, true);
}
- public boolean drained()
+ boolean drained()
{
if (_draining)
{
@@ -514,11 +487,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
Modified state = new Modified();
state.setDeliveryFailed(true);
- for (UnsettledAction action : _unsettledActionMap.values())
+ for (OutgoingDelivery delivery : _unsettled.values())
{
- action.process(state, Boolean.TRUE);
+ UnsettledAction action = delivery.getAction();
+ if (action != null)
+ {
+ action.process(state, Boolean.TRUE);
+ delivery.setAction(null);
+ }
}
- _unsettledActionMap.clear();
Error closingError = null;
if (getDestination() instanceof ExchangeDestination
@@ -558,20 +535,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
}
- public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
+ void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance messageInstance)
{
- _unsettledActionMap.put(tag, unsettledAction);
- if(getTransactionId() == null)
- {
- _unsettledMap2.put(tag, queueEntry);
- }
-
+ _unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
}
@Override
- protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ protected void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+ UnsettledAction action = _unsettled.get(deliveryTag).getAction();
boolean localSettle = false;
if(action != null)
{
@@ -583,9 +555,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
if(Boolean.TRUE.equals(settled) || localSettle)
{
- _unsettledActionMap.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
- _unsettledMap2.remove(deliveryTag);
+ _unsettled.remove(deliveryTag);
}
}
@@ -602,23 +572,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
&& getSession().hasCreditToSend();
}
- public UnsignedInteger getLastDeliveryId()
- {
- return _lastDeliveryId;
- }
-
- public void setLastDeliveryId(final UnsignedInteger deliveryId)
- {
- _lastDeliveryId = deliveryId;
- }
-
public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
{
- UnsignedInteger deliveryId;
- if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
+ if (settled && (_unsettled.remove(deliveryTag) != null))
{
- _unsettledMap2.remove(deliveryTag);
- getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+ getSession().updateDisposition(getRole(), deliveryTag, state, settled);
}
}
@@ -677,33 +635,34 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
getLink().setTarget(target);
- final MessageInstanceConsumer consumer = getConsumer();
+ final MessageInstanceConsumer oldConsumer = getConsumer();
createConsumerTarget();
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
- Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
- Map initialUnsettledMap = getInitialUnsettledMap();
+ Map<Binary, OutgoingDelivery> unsettledCopy = new HashMap<>(_unsettled);
+ Map<Binary, DeliveryState> remoteUnsettled =
+ attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
- for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+ for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final MessageInstance queueEntry = entry.getValue();
- if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+ final MessageInstance queueEntry = entry.getValue().getMessageInstance();
+ if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
- queueEntry.release(consumer);
- _unsettledMap2.remove(deliveryTag);
+ queueEntry.release(oldConsumer);
+ _unsettled.remove(deliveryTag);
}
- else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+ else if (remoteUnsettled.get(deliveryTag) instanceof Outcome)
{
- Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+ Outcome outcome = (Outcome) remoteUnsettled.get(deliveryTag);
if (outcome instanceof Accepted)
{
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
+ if (oldConsumer.acquires())
{
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
if (queueEntry.acquire() || queueEntry.isAcquired())
{
txn.dequeue(Collections.singleton(queueEntry),
@@ -723,15 +682,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
else if (outcome instanceof Released)
{
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
+ if (oldConsumer.acquires())
{
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
{
public void postCommit()
{
- queueEntry.release(consumer);
+ queueEntry.release(oldConsumer);
}
public void onRollback()
@@ -740,14 +699,17 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
});
}
}
- //_unsettledMap.remove(deliveryTag);
- initialUnsettledMap.remove(deliveryTag);
+
+ // TODO: Handle rejected and modified outcome
+
+ remoteUnsettled.remove(deliveryTag);
_resumeAcceptedTransfers.add(deliveryTag);
}
else
{
_resumeFullTransfers.add(queueEntry);
- // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+
+ // TODO: exists in receivers map, but not yet got an outcome ... should resend with resume = true
}
}
@@ -755,22 +717,22 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
@Override
- public void initialiseUnsettled()
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
{
- Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
-
- for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
+ Map<Binary, DeliveryState> unsettled = new HashMap<>();
+ for (Map.Entry<Binary, OutgoingDelivery> entry : _unsettled.entrySet())
{
- entry.setValue(null);
+ unsettled.put(entry.getKey(), entry.getValue().getLocalState());
}
+ return unsettled;
}
- public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
+ private MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
{
return _consumer;
}
- public ConsumerTarget_1_0 getConsumerTarget()
+ ConsumerTarget_1_0 getConsumerTarget()
{
return _consumerTarget;
}
@@ -784,4 +746,45 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
_destination = destination;
}
+
+ private static class OutgoingDelivery
+ {
+ private final MessageInstance _messageInstance;
+ private volatile UnsettledAction _action;
+ private volatile DeliveryState _localState;
+
+ public OutgoingDelivery(final MessageInstance messageInstance,
+ final UnsettledAction action,
+ final DeliveryState localState)
+ {
+ _messageInstance = messageInstance;
+ _action = action;
+ _localState = localState;
+ }
+
+ public MessageInstance getMessageInstance()
+ {
+ return _messageInstance;
+ }
+
+ public UnsettledAction getAction()
+ {
+ return _action;
+ }
+
+ public DeliveryState getLocalState()
+ {
+ return _localState;
+ }
+
+ public void setLocalState(final DeliveryState localState)
+ {
+ _localState = localState;
+ }
+
+ public void setAction(final UnsettledAction action)
+ {
+ _action = action;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 29f1a01..65a5265 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -35,7 +35,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,6 +71,9 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistryImpl;
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -155,9 +157,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
- private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
- private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
-
+ private final DeliveryRegistry _outgoingDeliveryRegistry = new DeliveryRegistryImpl();
+ private final DeliveryRegistry _incomingDeliveryRegistry = new DeliveryRegistryImpl();
private final Error _sessionEndedLinkError =
new Error(LinkError.DETACH_FORCED,
@@ -231,7 +232,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
}
- public void updateDisposition(final Role role,
+ void updateDisposition(final Role role,
final UnsignedInteger first,
final UnsignedInteger last,
final DeliveryState state, final boolean settled)
@@ -248,13 +249,13 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (settled)
{
- final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
- role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
+ final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+
SequenceNumber pos = new SequenceNumber(first.intValue());
SequenceNumber end = new SequenceNumber(last.intValue());
while (pos.compareTo(end) <= 0)
{
- unsettled.remove(UnsignedInteger.valueOf(pos.intValue()));
+ deliveryRegistry.removeDelivery(UnsignedInteger.valueOf(pos.intValue()));
pos.incr();
}
}
@@ -263,6 +264,21 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
//TODO - check send flow
}
+ void updateDisposition(final Role role,
+ final Binary deliveryTag,
+ final DeliveryState state,
+ final boolean settled)
+ {
+ final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+ UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+ if (deliveryId == null)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
+ }
+ updateDisposition(role, deliveryId, deliveryId, state, settled);
+ }
+
public boolean hasCreditToSend()
{
boolean b = _remoteIncomingWindow > 0;
@@ -283,32 +299,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (newDelivery)
{
deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
- endpoint.setLastDeliveryId(deliveryId);
+ xfr.setDeliveryId(deliveryId);
if (!settled)
{
- final Delivery delivery = new Delivery(xfr, endpoint);
- _outgoingUnsettled.put(deliveryId, delivery);
- endpoint.addUnsettled(delivery);
- }
- }
- else
- {
- deliveryId = endpoint.getLastDeliveryId();
- final Delivery delivery = _outgoingUnsettled.get(deliveryId);
- if (delivery != null)
- {
- if (!settled)
- {
- delivery.addTransfer(xfr);
- }
- else
- {
- endpoint.settle(delivery.getDeliveryTag());
- _outgoingUnsettled.remove(deliveryId);
- }
+ final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
+ _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
}
}
- xfr.setDeliveryId(deliveryId);
+
_remoteIncomingWindow--;
try
{
@@ -461,42 +459,40 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
Role dispositionRole = disposition.getRole();
- LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+ DeliveryRegistry unsettledDeliveries;
if(dispositionRole == Role.RECEIVER)
{
- unsettledTransfers = _outgoingUnsettled;
+ unsettledDeliveries = _outgoingDeliveryRegistry;
}
else
{
- unsettledTransfers = _incomingUnsettled;
-
+ unsettledDeliveries = _incomingDeliveryRegistry;
}
SequenceNumber deliveryId = new SequenceNumber(disposition.getFirst().intValue());
SequenceNumber last;
if(disposition.getLast() == null)
{
- last = deliveryId;
+ last = new SequenceNumber(deliveryId.intValue());
}
else
{
last = new SequenceNumber(disposition.getLast().intValue());
}
-
while(deliveryId.compareTo(last)<=0)
{
UnsignedInteger deliveryIdUnsigned = UnsignedInteger.valueOf(deliveryId.intValue());
- Delivery delivery = unsettledTransfers.get(deliveryIdUnsigned);
- if(delivery != null)
+ UnsettledDelivery unsettledDelivery = unsettledDeliveries.getDelivery(deliveryIdUnsigned);
+
+ if(unsettledDelivery != null)
{
- delivery.getLinkEndpoint().receiveDeliveryState(delivery,
- disposition.getState(),
- disposition.getSettled());
+ LinkEndpoint<?,?> linkEndpoint = unsettledDelivery.getLinkEndpoint();
+ linkEndpoint.receiveDeliveryState(unsettledDelivery.getDeliveryTag(), disposition.getState(), disposition.getSettled());
if (Boolean.TRUE.equals(disposition.getSettled()))
{
- unsettledTransfers.remove(deliveryIdUnsigned);
+ unsettledDeliveries.removeDelivery(deliveryIdUnsigned);
}
}
deliveryId.incr();
@@ -607,62 +603,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
else
{
AbstractReceivingLinkEndpoint endpoint = ((AbstractReceivingLinkEndpoint) linkEndpoint);
-
- UnsignedInteger deliveryId = transfer.getDeliveryId();
- if (deliveryId == null)
- {
- deliveryId = endpoint.getLastDeliveryId();
- }
-
- Delivery delivery = _incomingUnsettled.get(deliveryId);
- if (delivery == null)
- {
- delivery = new Delivery(transfer, endpoint);
- _incomingUnsettled.put(deliveryId, delivery);
-
- if (Boolean.TRUE.equals(transfer.getMore()))
- {
- endpoint.setLastDeliveryId(transfer.getDeliveryId());
- }
- }
- else
- {
- if (delivery.getDeliveryId().equals(deliveryId))
- {
- delivery.addTransfer(transfer);
-
- if (!Boolean.TRUE.equals(transfer.getMore()))
- {
- endpoint.setLastDeliveryId(null);
- }
- }
- else
- {
- End reply = new End();
-
- Error error = new Error();
- error.setCondition(AmqpError.ILLEGAL_STATE);
- error.setDescription("TRANSFER called on Session for link handle "
- + inputHandle
- + " with incorrect delivery id "
- + transfer.getDeliveryId());
- reply.setError(error);
- _connection.sendEnd(_sendingChannel, reply, true);
-
- return;
-
- }
- }
-
- Error error = endpoint.receiveTransfer(transfer, delivery);
- if(error != null)
- {
- endpoint.close(error);
- }
- if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
- {
- _incomingUnsettled.remove(deliveryId);
- }
+ endpoint.receiveTransfer(transfer);
}
}
@@ -1536,6 +1477,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
_endpointToOutputHandle.remove(linkEndpoint);
_associatedLinkEndpoints.remove(linkEndpoint);
+ if (linkEndpoint.getRole() == Role.RECEIVER)
+ {
+ getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+ }
+ else
+ {
+ getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+ }
}
private void detach(UnsignedInteger handle, Detach detach)
@@ -1614,6 +1563,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
return primaryDomain;
}
+ DeliveryRegistry getOutgoingDeliveryRegistry()
+ {
+ return _outgoingDeliveryRegistry;
+ }
+
+ DeliveryRegistry getIncomingDeliveryRegistry()
+ {
+ return _incomingDeliveryRegistry;
+ }
+
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 96dbdc1..048fa20 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,20 +52,14 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
- private ArrayList<Transfer> _incompleteMessage;
- private boolean _resumedMessage;
- private Binary _messageDeliveryTag;
- private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
private ReceivingDestination _receivingDestination;
public StandardReceivingLinkEndpoint(final Session_1_0 session,
@@ -89,107 +82,30 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
@Override
- protected Error messageTransfer(Transfer xfr)
+ protected Error receiveDelivery(Delivery delivery)
{
- List<QpidByteBuffer> fragments = null;
-
- org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
- final Binary deliveryTag = xfr.getDeliveryTag();
- UnsignedInteger messageFormat = null;
- ReceiverSettleMode transferReceiverSettleMode = null;
- Error error = null;
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<>();
- _incompleteMessage.add(xfr);
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- fragments = new ArrayList<>(_incompleteMessage.size());
-
- for (Transfer t : _incompleteMessage)
- {
- if (t.getMessageFormat() != null && messageFormat == null)
- {
- messageFormat = t.getMessageFormat();
- }
-
- if (t.getRcvSettleMode() != null)
- {
- if (transferReceiverSettleMode == null)
- {
- transferReceiverSettleMode = t.getRcvSettleMode();
- }
- else if (!transferReceiverSettleMode.equals(t.getRcvSettleMode()))
- {
- error = new Error(AmqpError.INVALID_FIELD,
- "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
- break;
- }
- }
- fragments.addAll(t.getPayload());
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- fragments = xfr.getPayload();
- messageFormat = xfr.getMessageFormat();
- transferReceiverSettleMode = xfr.getRcvSettleMode();
- xfr.dispose();
- }
-
- if (error == null && !ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
- && ReceiverSettleMode.SECOND.equals(transferReceiverSettleMode))
- {
- error = new Error(AmqpError.INVALID_FIELD,
- "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
-
- }
-
- if (error != null)
- {
- for (QpidByteBuffer fragment : fragments)
- {
- fragment.dispose();
- }
- return error;
- }
+ ReceiverSettleMode transferReceiverSettleMode = delivery.getReceiverSettleMode();
- if(_resumedMessage)
+ if(delivery.getResume())
{
- if(_unsettledMap.containsKey(_messageDeliveryTag))
+ DeliveryState deliveryState = _unsettled.get(delivery.getDeliveryTag());
+ if (deliveryState instanceof Outcome)
{
- Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
- updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
- if(settled)
- {
- _unsettledMap.remove(_messageDeliveryTag);
- }
+ updateDisposition(delivery.getDeliveryTag(), deliveryState, settled);
+ return null;
}
else
{
- throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
+ // TODO: create message ?
}
}
else
{
ServerMessage<?> serverMessage;
-
+ UnsignedInteger messageFormat = delivery.getMessageFormat();
+ org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = delivery.getState();
+ List<QpidByteBuffer> fragments = delivery.getPayload();
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
if(format != null)
{
@@ -279,7 +195,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
if (transactionId == null)
{
- resultantState = (DeliveryState) outcome;
+ resultantState = outcome;
}
else
{
@@ -302,12 +218,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
- if (!settled)
- {
- _unsettledMap.put(deliveryTag, outcome);
- }
-
- updateDisposition(deliveryTag, resultantState, settled);
+ updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
getSession().getAMQPConnection()
.registerMessageReceived(serverMessage.getSize(), arrivalTime);
@@ -319,12 +230,12 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
public void postCommit()
{
- updateDisposition(deliveryTag, null, true);
+ updateDisposition(delivery.getDeliveryTag(), null, true);
}
public void onRollback()
{
- updateDisposition(deliveryTag, null, true);
+ updateDisposition(delivery.getDeliveryTag(), null, true);
}
});
}
@@ -375,16 +286,13 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
}
-
@Override
- protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
{
- if(Boolean.TRUE.equals(settled))
- {
- _unsettledMap.remove(deliveryTag);
- }
+ return new HashMap<>(_unsettled);
}
+
@Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
@@ -433,26 +341,20 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
setCapabilities(targetCapabilities);
setDestination(destination);
- Map initialUnsettledMap = getInitialUnsettledMap();
- Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
- for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+ Map remoteUnsettled = attach.getUnsettled();
+ Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+ for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- if(!initialUnsettledMap.containsKey(deliveryTag))
+ if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
{
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
}
}
getLink().setTermini(source, target);
}
- @Override
- public void initialiseUnsettled()
- {
- _localUnsettled = new HashMap(_unsettledMap);
- }
-
public ReceivingDestination getReceivingDestination()
{
return _receivingDestination;
@@ -515,7 +417,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
attachReceived(attach);
- initialiseUnsettled();
}
@Override
@@ -528,21 +429,4 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
attachReceived(attach);
}
-
- @Override
- protected void detach(Error error, boolean close)
- {
- super.detach(error, close);
-
- if (_incompleteMessage != null)
- {
- for (Transfer t : _incompleteMessage)
- {
- t.dispose();
- }
- _incompleteMessage = null;
- }
- _messageDeliveryTag = null;
- _resumedMessage = false;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 25dd8bd..108943d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -19,8 +19,8 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +44,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -52,7 +51,6 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
{
private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
- private ArrayList<Transfer> _incompleteMessage;
public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
{
@@ -67,43 +65,11 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
- protected Error messageTransfer(Transfer xfr)
+ protected Error receiveDelivery(Delivery delivery)
{
- List<QpidByteBuffer> payload = new ArrayList<>();
+ List<QpidByteBuffer> payload = delivery.getPayload();
- final Binary deliveryTag = xfr.getDeliveryTag();
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<Transfer>();
- _incompleteMessage.add(xfr);
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- for(Transfer t : _incompleteMessage)
- {
- final List<QpidByteBuffer> bufs = t.getPayload();
- if(bufs != null)
- {
- payload.addAll(bufs);
- }
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- payload.addAll(xfr.getPayload());
- xfr.dispose();
- }
// Only interested in the amqp-value section that holds the message to the coordinator
try
@@ -132,7 +98,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
session.incrementStartedTransactions();
state.setTxnId(session.integerToBinary(txn.getId()));
- updateDisposition(deliveryTag, state, true);
+ updateDisposition(delivery.getDeliveryTag(), state, true);
}
else if(command instanceof Discharge)
@@ -159,7 +125,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
if (error == null)
{
- updateDisposition(deliveryTag, outcome, true);
+ updateDisposition(delivery.getDeliveryTag(), outcome, true);
}
return error;
}
@@ -250,6 +216,12 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
protected void reattachLink(final Attach attach) throws AmqpErrorException
{
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
@@ -283,20 +255,10 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
- protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
-
- }
-
- @Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
}
- @Override
- public void initialiseUnsettled()
- {
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 7a47703..55588a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -131,7 +131,7 @@ public class ValueHandler implements DescribedTypeConstructorRegistry.Source
{
position--;
}
- originalPositions[i] = position;
+ originalPositions[i - firstBufferWithAvailable] = position;
}
Object descriptor = parse(in);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
new file mode 100644
index 0000000..8a054fd
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface DeliveryRegistry
+{
+ void addDelivery(UnsignedInteger deliveryId, UnsettledDelivery unsettledDelivery);
+ void removeDelivery(UnsignedInteger deliveryId);
+ UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
+ void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
+ UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
new file mode 100644
index 0000000..466164f
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class DeliveryRegistryImpl implements DeliveryRegistry
+{
+ private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
+ private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+
+ @Override
+ public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
+ {
+ _deliveries.put(deliveryId, unsettledDelivery);
+ _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+ }
+
+ @Override
+ public void removeDelivery(final UnsignedInteger deliveryId)
+ {
+ UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
+ if (unsettledDelivery != null)
+ {
+ _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ }
+ }
+
+ @Override
+ public UnsettledDelivery getDelivery(final UnsignedInteger deliveryId)
+ {
+ return _deliveries.get(deliveryId);
+ }
+
+ @Override
+ public void removeDeliveriesForLinkEndpoint(final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ Iterator<UnsettledDelivery> iterator = _deliveries.values().iterator();
+ while (iterator.hasNext())
+ {
+ UnsettledDelivery unsettledDelivery = iterator.next();
+ if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
+ {
+ iterator.remove();
+ _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ }
+ }
+ }
+
+ @Override
+ public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+ {
+ return _deliveryIds.get(deliveryTag);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor
transfer functionality
Posted by or...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
new file mode 100644
index 0000000..48d2ab1
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/UnsettledDelivery.java
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+
+public class UnsettledDelivery
+{
+ private final Binary _deliveryTag;
+ private final LinkEndpoint<?,?> _linkEndpoint;
+
+ public UnsettledDelivery(final Binary deliveryTag, final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ _deliveryTag = deliveryTag;
+ _linkEndpoint = linkEndpoint;
+ }
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ public LinkEndpoint<?, ?> getLinkEndpoint()
+ {
+ return _linkEndpoint;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
index 968ce4e..ee66d7b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/Outcome.java
@@ -21,7 +21,7 @@
package org.apache.qpid.server.protocol.v1_0.type;
-public interface Outcome
+public interface Outcome extends DeliveryState
{
Symbol getSymbol();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
index 3bae54d..e8b404a 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Attach.java
@@ -30,6 +30,8 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
@@ -62,7 +64,7 @@ public class Attach implements FrameBody
private BaseTarget _target;
@CompositeTypeField
- private Map _unsettled;
+ private Map<Binary, DeliveryState> _unsettled;
@CompositeTypeField
private Boolean _incompleteUnsettled;
@@ -80,7 +82,7 @@ public class Attach implements FrameBody
private Symbol[] _desiredCapabilities;
@CompositeTypeField
- private Map _properties;
+ private Map<Symbol, Object> _properties;
public String getName()
{
@@ -152,7 +154,7 @@ public class Attach implements FrameBody
_target = target;
}
- public Map getUnsettled()
+ public Map<Binary, DeliveryState> getUnsettled()
{
return _unsettled;
}
@@ -212,12 +214,12 @@ public class Attach implements FrameBody
_desiredCapabilities = desiredCapabilities;
}
- public Map getProperties()
+ public Map<Symbol, Object> getProperties()
{
return _properties;
}
- public void setProperties(Map properties)
+ public void setProperties(Map<Symbol, Object> properties)
{
_properties = properties;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
index 0e0489a..001b5fe 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Begin.java
@@ -60,7 +60,7 @@ public class Begin implements FrameBody
private Symbol[] _desiredCapabilities;
@CompositeTypeField
- private Map _properties;
+ private Map<Symbol, Object> _properties;
public UnsignedShort getRemoteChannel()
{
@@ -132,12 +132,12 @@ public class Begin implements FrameBody
_desiredCapabilities = desiredCapabilities;
}
- public Map getProperties()
+ public Map<Symbol, Object> getProperties()
{
return _properties;
}
- public void setProperties(Map properties)
+ public void setProperties(Map<Symbol, Object> properties)
{
_properties = properties;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
index 1b8722a..8b13cf3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Disposition.java
@@ -26,6 +26,7 @@ package org.apache.qpid.server.protocol.v1_0.type.transport;
import java.nio.ByteBuffer;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
+import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -34,16 +35,22 @@ public class Disposition implements FrameBody
{
private ByteBuffer _payload;
+ @CompositeTypeField(mandatory = true)
private Role _role;
+ @CompositeTypeField(mandatory = true)
private UnsignedInteger _first;
+ @CompositeTypeField
private UnsignedInteger _last;
+ @CompositeTypeField
private Boolean _settled;
+ @CompositeTypeField
private DeliveryState _state;
+ @CompositeTypeField
private Boolean _batchable;
public Role getRole()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
index 63b567e..8c87c7b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
@@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
public class Flow implements FrameBody
@@ -66,7 +67,7 @@ public class Flow implements FrameBody
private Boolean _echo;
@CompositeTypeField
- private Map _properties;
+ private Map<Symbol, Object> _properties;
public UnsignedInteger getNextIncomingId()
{
@@ -168,12 +169,12 @@ public class Flow implements FrameBody
_echo = echo;
}
- public Map getProperties()
+ public Map<Symbol, Object> getProperties()
{
return _properties;
}
- public void setProperties(Map properties)
+ public void setProperties(Map<Symbol, Object> properties)
{
_properties = properties;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
index 261543f..303cc28 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/BrokerAdmin.java
@@ -41,6 +41,7 @@ public interface BrokerAdmin extends Pluggable
void createQueue(String queueName);
void deleteQueue(String queueName);
void putMessageOnQueue(String queueName, String... messages);
+ int getQueueDepthMessages(String testQueueName);
boolean supportsRestart();
ListenableFuture<Void> restart();
@@ -48,10 +49,13 @@ public interface BrokerAdmin extends Pluggable
boolean isSASLSupported();
boolean isSASLMechanismSupported(String mechanismName);
boolean isWebSocketSupported();
+ boolean isQueueDepthSupported();
String getValidUsername();
String getValidPassword();
+
+
enum PortType
{
ANONYMOUS_AMQP,
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
index 2680246..e4dd859 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/EmbeddedBrokerPerClassAdminImpl.java
@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.SystemLauncher;
import org.apache.qpid.server.SystemLauncherListener;
import org.apache.qpid.server.logging.logback.LogbackLoggingSystemLauncherListener;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Container;
import org.apache.qpid.server.model.IllegalStateTransitionException;
@@ -306,6 +307,13 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
}
@Override
+ public int getQueueDepthMessages(final String testQueueName)
+ {
+ Queue queue = _currentVirtualHostNode.getVirtualHost().getChildByName(Queue.class, testQueueName);
+ return queue.getQueueDepthMessages();
+ }
+
+ @Override
public boolean supportsRestart()
{
return _isPersistentStore;
@@ -348,6 +356,12 @@ public class EmbeddedBrokerPerClassAdminImpl implements BrokerAdmin
}
@Override
+ public boolean isQueueDepthSupported()
+ {
+ return true;
+ }
+
+ @Override
public String getValidUsername()
{
return "guest";
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
index 045df9b..a580333 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/ExternalQpidBrokerAdminImpl.java
@@ -95,6 +95,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
}
@Override
+ public int getQueueDepthMessages(final String testQueueName)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public boolean supportsRestart()
{
return false;
@@ -119,6 +125,12 @@ public class ExternalQpidBrokerAdminImpl implements BrokerAdmin
}
@Override
+ public boolean isQueueDepthSupported()
+ {
+ return false;
+ }
+
+ @Override
public boolean isSASLMechanismSupported(final String mechanismName)
{
return true;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
index eac8770..9da8a8e 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Interaction.java
@@ -22,8 +22,14 @@ package org.apache.qpid.tests.protocol.v1_0;
import static com.google.common.util.concurrent.Futures.allAsList;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -38,27 +44,39 @@ import java.util.concurrent.TimeoutException;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue;
import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit;
import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Interaction
@@ -71,6 +89,7 @@ public class Interaction
private final Detach _detach;
private final Flow _flow;
private final Transfer _transfer;
+ private final Disposition _disposition;
private final FrameTransport _transport;
private final SaslInit _saslInit;
private final SaslResponse _saslResponse;
@@ -79,6 +98,9 @@ public class Interaction
private UnsignedShort _sessionChannel;
private Response<?> _latestResponse;
private ListenableFuture<?> _latestFuture;
+ private int _deliveryIdCounter;
+ private List<Transfer> _latestDelivery;
+ private Object _decodedLatestDelivery;
Interaction(final FrameTransport frameTransport)
{
@@ -120,7 +142,10 @@ public class Interaction
_transfer = new Transfer();
_transfer.setHandle(defaultLinkHandle);
_transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8)));
- _transfer.setDeliveryId(UnsignedInteger.ZERO);
+ _transfer.setDeliveryId(getNextDeliveryId());
+
+ _disposition = new Disposition();
+ _disposition.setFirst(UnsignedInteger.ZERO);
}
/////////////////////////
@@ -316,6 +341,12 @@ public class Interaction
return this;
}
+ public Interaction attachSndSettleMode(final SenderSettleMode senderSettleMode)
+ {
+ _attach.setSndSettleMode(senderSettleMode);
+ return this;
+ }
+
public Interaction attachSource(Source source)
{
_attach.setSource(source);
@@ -344,6 +375,14 @@ public class Interaction
return this;
}
+ public Interaction attachSourceDefaultOutcome(final Outcome defaultOutcome)
+ {
+ Source source = ((Source) _attach.getSource());
+ source.setDefaultOutcome(defaultOutcome);
+ _attach.setSource(source);
+ return this;
+ }
+
public Interaction attachTargetAddress(final String address)
{
final Target target = ((Target) _attach.getTarget());
@@ -434,6 +473,12 @@ public class Interaction
return this;
}
+ public Interaction flowProperties(Map<Symbol, Object> properties)
+ {
+ _flow.setProperties(properties);
+ return this;
+ }
+
public Interaction flow() throws Exception
{
sendPerformativeAndChainFuture(_flow, _sessionChannel);
@@ -456,6 +501,19 @@ public class Interaction
return this;
}
+ public Interaction transferState(final DeliveryState state)
+ {
+ _transfer.setState(state);
+ return this;
+ }
+
+ public Interaction transferTransactionalState(final Binary transactionalId)
+ {
+ TransactionalState transactionalState = new TransactionalState();
+ transactionalState.setTxnId(transactionalId);
+ return transferState(transactionalState);
+ }
+
public Interaction transferDeliveryId(final UnsignedInteger deliveryId)
{
_transfer.setDeliveryId(deliveryId);
@@ -474,13 +532,19 @@ public class Interaction
return this;
}
+ public Interaction transferAborted(final Boolean aborted)
+ {
+ _transfer.setAborted(aborted);
+ return this;
+ }
+
public Interaction transferMessageFormat(final UnsignedInteger messageFormat)
{
_transfer.setMessageFormat(messageFormat);
return this;
}
- public Interaction transferPayload(final List<QpidByteBuffer> payload)
+ public Interaction setPayloadOnTransfer(final List<QpidByteBuffer> payload)
{
_transfer.setPayload(payload);
return this;
@@ -488,17 +552,21 @@ public class Interaction
public Interaction transferPayloadData(final Object payload)
{
+ setPayloadOnTransfer(_transfer, payload);
+ return this;
+ }
+
+ private void setPayloadOnTransfer(final Transfer transfer, final Object payload)
+ {
AmqpValue amqpValue = new AmqpValue(payload);
final AmqpValueSection section = amqpValue.createEncodingRetainingSection();
final List<QpidByteBuffer> encodedForm = section.getEncodedForm();
- _transfer.setPayload(encodedForm);
-
+ transfer.setPayload(encodedForm);
section.dispose();
for (QpidByteBuffer qbb : encodedForm)
{
qbb.dispose();
}
- return this;
}
public Interaction transferSettled(final Boolean settled)
@@ -513,6 +581,126 @@ public class Interaction
return this;
}
+ /////////////////
+ // disposition //
+ /////////////////
+
+ public Interaction dispositionSettled(final boolean settled)
+ {
+ _disposition.setSettled(settled);
+ return this;
+ }
+
+ public Interaction dispositionState(final DeliveryState state)
+ {
+ _disposition.setState(state);
+ return this;
+ }
+
+
+ public Interaction dispositionTransactionalState(final Binary transactionId, final Outcome outcome)
+ {
+ TransactionalState state = new TransactionalState();
+ state.setTxnId(transactionId);
+ state.setOutcome(outcome);
+ return dispositionState(state);
+ }
+
+ public Interaction dispositionRole(final Role role)
+ {
+ _disposition.setRole(role);
+ return this;
+ }
+
+ public Interaction disposition() throws Exception
+ {
+ sendPerformativeAndChainFuture(_disposition, _sessionChannel);
+ return this;
+ }
+
+ /////////////////
+ // transaction //
+ ////////////////
+
+ public Interaction txnAttachCoordinatorLink(InteractionTransactionalState transactionalState) throws Exception
+ {
+ Attach attach = new Attach();
+ attach.setName("testTransactionCoordinator-" + transactionalState.getHandle());
+ attach.setHandle(transactionalState.getHandle());
+ attach.setInitialDeliveryCount(UnsignedInteger.ZERO);
+ attach.setTarget(new Coordinator());
+ attach.setRole(Role.SENDER);
+ Source source = new Source();
+ attach.setSource(source);
+ source.setOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL);
+ sendPerformativeAndChainFuture(attach, _sessionChannel);
+ consumeResponse(Attach.class);
+ consumeResponse(Flow.class);
+ return this;
+ }
+
+ public Interaction txnDeclare(final InteractionTransactionalState txnState) throws Exception
+ {
+ Transfer transfer = createTransactionTransfer(txnState.getHandle());
+ setPayloadOnTransfer(transfer, new Declare());
+ sendPerformativeAndChainFuture(transfer, _sessionChannel);
+ consumeResponse(Disposition.class);
+ Disposition declareTransactionDisposition = getLatestResponse(Disposition.class);
+ assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+ assertThat(declareTransactionDisposition.getState(), is(instanceOf(Declared.class)));
+ Binary transactionId = ((Declared) declareTransactionDisposition.getState()).getTxnId();
+ assertThat(transactionId, is(notNullValue()));
+ consumeResponse(Flow.class);
+ txnState.setLastTransactionId(transactionId);
+ return this;
+ }
+
+ public Interaction txnDischarge(final InteractionTransactionalState txnState, boolean failed) throws Exception
+ {
+ final Discharge discharge = new Discharge();
+ discharge.setTxnId(txnState.getCurrentTransactionId());
+ discharge.setFail(failed);
+
+ Transfer transfer = createTransactionTransfer(txnState.getHandle());
+ setPayloadOnTransfer(transfer, discharge);
+ sendPerformativeAndChainFuture(transfer, _sessionChannel);
+
+ Disposition declareTransactionDisposition = null;
+ Flow coordinatorFlow = null;
+ do
+ {
+ consumeResponse(Disposition.class, Flow.class);
+ Response<?> response = getLatestResponse();
+ if (response.getBody() instanceof Disposition)
+ {
+ declareTransactionDisposition = (Disposition) response.getBody();
+ }
+ if (response.getBody() instanceof Flow)
+ {
+ final Flow flowResponse = (Flow) response.getBody();
+ if (flowResponse.getHandle().equals(txnState.getHandle()))
+ {
+ coordinatorFlow = flowResponse;
+ }
+ }
+ } while(declareTransactionDisposition == null || coordinatorFlow == null);
+
+ assertThat(declareTransactionDisposition.getSettled(), is(equalTo(true)));
+ assertThat(declareTransactionDisposition.getState(), is(instanceOf(Accepted.class)));
+
+ txnState.setLastTransactionId(null);
+ return this;
+ }
+
+ private Transfer createTransactionTransfer(final UnsignedInteger handle)
+ {
+ Transfer transfer = new Transfer();
+ transfer.setHandle(handle);
+ transfer.setDeliveryId(getNextDeliveryId());
+ transfer.setDeliveryTag(new Binary(("transaction-" + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8)));
+ return transfer;
+ }
+
//////////
// misc //
//////////
@@ -567,11 +755,14 @@ public class Interaction
return this;
}
acceptableResponseClasses.remove(null);
- for (Class<?> acceptableResponseClass : acceptableResponseClasses)
+ if (_latestResponse != null)
{
- if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ for (Class<?> acceptableResponseClass : acceptableResponseClasses)
{
- return this;
+ if (acceptableResponseClass.isAssignableFrom(_latestResponse.getBody().getClass()))
+ {
+ return this;
+ }
}
}
throw new IllegalStateException(String.format("Unexpected response. Expected one of '%s' got '%s'.",
@@ -613,4 +804,58 @@ public class Interaction
_flow.setHandle(_attach.getHandle());
return this;
}
+
+ private UnsignedInteger getNextDeliveryId()
+ {
+ return UnsignedInteger.valueOf(_deliveryIdCounter++);
+ }
+
+ public Interaction receiveDelivery() throws Exception
+ {
+ _latestDelivery = receiveAllTransfers();
+ return this;
+ }
+
+ public Interaction decodeLatestDelivery() throws AmqpErrorException
+ {
+ MessageDecoder messageDecoder = new MessageDecoder();
+ _latestDelivery.forEach(transfer ->
+ {
+ messageDecoder.addTransfer(transfer);
+ transfer.dispose();
+ });
+ _decodedLatestDelivery = messageDecoder.getData();
+ _latestDelivery = null;
+ return this;
+ }
+
+ public List<Transfer> getLatestDelivery()
+ {
+ return _latestDelivery;
+ }
+
+ public Object getDecodedLatestDelivery()
+ {
+ return _decodedLatestDelivery;
+ }
+
+ private List<Transfer> receiveAllTransfers() throws Exception
+ {
+ List<Transfer> transfers = new ArrayList<>();
+ boolean hasMore;
+ do
+ {
+ Transfer responseTransfer = consumeResponse().getLatestResponse(Transfer.class);
+ hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+ transfers.add(responseTransfer);
+ }
+ while (hasMore);
+
+ return transfers;
+ }
+
+ public InteractionTransactionalState createTransactionalState(final UnsignedInteger handle)
+ {
+ return new InteractionTransactionalState(handle);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
new file mode 100644
index 0000000..061be92
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InteractionTransactionalState.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.tests.protocol.v1_0;
+
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class InteractionTransactionalState
+{
+ private final UnsignedInteger _handle;
+ private Binary _lastTransactionId;
+
+ public InteractionTransactionalState(final UnsignedInteger handle)
+ {
+ _handle = handle;
+ }
+
+ public UnsignedInteger getHandle()
+ {
+ return _handle;
+ }
+
+ public void setLastTransactionId(final Binary lastTransactionId)
+ {
+ _lastTransactionId = lastTransactionId;
+ }
+
+ public Binary getCurrentTransactionId()
+ {
+ return _lastTransactionId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
index bfe05b8..446c112 100644
--- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
+++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/Utils.java
@@ -30,7 +30,6 @@ import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Utils
{
@@ -81,19 +80,11 @@ public class Utils
.flowNextOutgoingId(UnsignedInteger.ZERO)
.flowLinkCredit(UnsignedInteger.ONE)
.flowHandleFromLinkHandle()
- .flow();
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
- MessageDecoder messageDecoder = new MessageDecoder();
- boolean hasMore;
- do
- {
- Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
- messageDecoder.addTransfer(responseTransfer);
- hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
- }
- while (hasMore);
-
- return messageDecoder.getData();
+ return interaction.getDecodedLatestDelivery();
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
new file mode 100644
index 0000000..2f83a13
--- /dev/null
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/MultiTransferTest.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.tests.protocol.v1_0.messaging;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isOneOf;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
+import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
+import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
+import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
+import org.apache.qpid.tests.protocol.v1_0.Response;
+import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
+
+public class MultiTransferTest extends ProtocolTestBase
+{
+ private InetSocketAddress _brokerAddress;
+ private String _originalMmsMessageStorePersistence;
+
+ @Before
+ public void setUp()
+ {
+ _originalMmsMessageStorePersistence = System.getProperty("qpid.tests.mms.messagestore.persistence");
+ System.setProperty("qpid.tests.mms.messagestore.persistence", "false");
+
+ getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
+ _brokerAddress = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ }
+
+ @After
+ public void tearDown()
+ {
+ if (_originalMmsMessageStorePersistence != null)
+ {
+ System.setProperty("qpid.tests.mms.messagestore.persistence", _originalMmsMessageStorePersistence);
+ }
+ else
+ {
+ System.clearProperty("qpid.tests.mms.messagestore.persistence");
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.14",
+ description = "For messages that are too large to fit within the maximum frame size, additional data MAY"
+ + " be transferred in additional transfer frames by setting the more flag on all"
+ + " but the last transfer frame")
+ public void multiTransferMessage() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ QpidByteBuffer[] payloads = splitPayload("testData", 2);
+
+ final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+ final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+ Interaction interaction = transport.newInteraction();
+ Disposition disposition = interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transferDeliveryId(deliveryId)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .transfer()
+ .sync()
+ .transferMore(false)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+ .transfer()
+ .consumeResponse()
+ .getLatestResponse(Disposition.class);
+
+ assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
+ assertThat(disposition.getLast(), isOneOf(null, deliveryId));
+ assertThat(disposition.getSettled(), is(equalTo(false)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.5",
+ description = "[delivery-id] On continuation transfers the delivery-id MAY be omitted..."
+ + "[delivery-tag] field MUST be specified for the first transfer of a multi-transfer"
+ + " message and can only be omitted for continuation transfers.")
+ public void multiTransferMessageOmittingOptionalTagAndID() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ QpidByteBuffer[] payloads = splitPayload("testData", 4);
+ final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+ final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+ Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .transferDeliveryId(deliveryId)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transfer()
+ .sync()
+ .transferDeliveryId(deliveryId)
+ .transferDeliveryTag(null)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[1]))
+ .transfer()
+ .sync()
+ .transferDeliveryId(null)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[2]))
+ .transfer()
+ .sync()
+ .transferDeliveryId(null)
+ .transferDeliveryTag(null)
+ .transferMore(false)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[3]))
+ .transfer()
+ .consumeResponse();
+
+ Disposition disposition = interaction.getLatestResponse(Disposition.class);
+
+ assertThat(disposition.getFirst(), is(equalTo(deliveryId)));
+ assertThat(disposition.getLast(), isOneOf(null, deliveryId));
+ assertThat(disposition.getSettled(), is(equalTo(false)));
+ assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
+ }
+ }
+
+
+ //
+
+ @Test
+ @SpecificationTest(section = "2.6.14",
+ description = "The sender MAY indicate an aborted attempt to deliver a message by setting the abort flag on the last transfer."
+ + "In this case the receiver MUST discard the message data that was transferred prior to the abort.")
+ public void abortMultiTransferMessage() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ QpidByteBuffer[] payloads = splitPayload("testData", 2);
+
+ final UnsignedInteger deliveryId = UnsignedInteger.ZERO;
+ final Binary deliveryTag = new Binary("testTransfer".getBytes(UTF_8));
+
+ Interaction interaction = transport.newInteraction();
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .setPayloadOnTransfer(Collections.singletonList(payloads[0]))
+ .transferDeliveryId(deliveryId)
+ .transferDeliveryTag(deliveryTag)
+ .transferMore(true)
+ .transfer()
+ .sync()
+ .setPayloadOnTransfer(null)
+ .transferMore(null)
+ .transferAborted(true)
+ .transfer();
+
+ Response<?> latestResponse = interaction.consumeResponse(new Class<?>[] {null}).getLatestResponse();
+ assertThat(latestResponse, is(nullValue()));
+ }
+ }
+ @Test
+ @SpecificationTest(section = "2.6.14",
+ description = "[...]messages being transferred along different links MAY be interleaved")
+ public void multiTransferInterleaved() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
+ QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+
+ UnsignedInteger linkHandle1 = UnsignedInteger.ZERO;
+ UnsignedInteger linkHandle2 = UnsignedInteger.ONE;
+ Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
+ Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
+ UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
+ UnsignedInteger deliveryId2 = UnsignedInteger.ONE;
+
+ Interaction interaction = transport.newInteraction();
+
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+
+ .attachName("testLink1")
+ .attachHandle(linkHandle1)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .attachName("testLink2")
+ .attachHandle(linkHandle2)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferHandle(linkHandle1)
+ .transferDeliveryId(deliverId1)
+ .transferDeliveryTag(deliveryTag1)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+ .transfer()
+ .sync()
+
+ .transferHandle(linkHandle2)
+ .transferDeliveryId(deliveryId2)
+ .transferDeliveryTag(deliveryTag2)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+ .transfer()
+ .sync()
+
+ .transferHandle(linkHandle1)
+ .transferDeliveryId(deliverId1)
+ .transferDeliveryTag(deliveryTag1)
+ .transferMore(false)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload1[1]))
+ .transfer()
+ .sync()
+
+ .transferHandle(linkHandle2)
+ .transferDeliveryId(deliveryId2)
+ .transferDeliveryTag(deliveryTag2)
+ .transferMore(false)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload2[1]))
+ .transfer()
+ .sync();
+
+ Map<UnsignedInteger, Disposition> dispositionMap = new HashMap<>();
+ for (int i = 0; i < 2; i++)
+ {
+ Disposition disposition = interaction.consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ dispositionMap.put(disposition.getFirst(), disposition);
+
+ assertThat(disposition.getLast(), isOneOf(null, disposition.getFirst()));
+ assertThat(disposition.getSettled(), is(equalTo(false)));
+ assertThat(disposition.getState(), is(instanceOf(Accepted.class)));
+ }
+
+ assertThat("Unexpected number of dispositions", dispositionMap.size(), equalTo(2));
+ assertThat(dispositionMap.containsKey(deliverId1), is(true));
+ assertThat(dispositionMap.containsKey(deliveryId2), is(true));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.14",
+ description = "[...]messages transferred along a single link MUST NOT be interleaved")
+ public void illegallyInterleavedMultiTransferOnSingleLink() throws Exception
+ {
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ QpidByteBuffer[] messagePayload1 = splitPayload("testData1", 2);
+ QpidByteBuffer[] messagePayload2 = splitPayload("testData2", 2);
+
+ Binary deliveryTag1 = new Binary("testTransfer1".getBytes(UTF_8));
+ Binary deliveryTag2 = new Binary("testTransfer2".getBytes(UTF_8));
+ UnsignedInteger deliverId1 = UnsignedInteger.ZERO;
+ UnsignedInteger deliveryId2 = UnsignedInteger.ONE;
+
+ Interaction interaction = transport.newInteraction();
+
+ interaction.negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+
+ .transferDeliveryId(deliverId1)
+ .transferDeliveryTag(deliveryTag1)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload1[0]))
+ .transfer()
+ .sync()
+
+ .transferDeliveryId(deliveryId2)
+ .transferDeliveryTag(deliveryTag2)
+ .transferMore(true)
+ .setPayloadOnTransfer(Collections.singletonList(messagePayload2[0]))
+ .transfer()
+ .sync();
+
+ interaction.consumeResponse(Detach.class, End.class, Close.class);
+ }
+ }
+
+ private QpidByteBuffer[] splitPayload(final String messageContent, int numberOfParts)
+ {
+ MessageEncoder messageEncoder = new MessageEncoder();
+ final Header header = new Header();
+ messageEncoder.setHeader(header);
+ messageEncoder.addData(messageContent);
+ List<QpidByteBuffer> payload = messageEncoder.getPayload();
+ long size = QpidByteBufferUtils.remaining(payload);
+
+ QpidByteBuffer[] result = new QpidByteBuffer[numberOfParts];
+ int chunkSize = (int) size / numberOfParts;
+ int lastChunkSize = (int) size - chunkSize * (numberOfParts - 1);
+ for (int i = 0; i < numberOfParts; i++)
+ {
+ result[i] = QpidByteBuffer.allocate(false, i == numberOfParts - 1 ? lastChunkSize : chunkSize);
+ }
+
+ int currentBufferIndex = 0;
+ for (QpidByteBuffer p : payload)
+ {
+ final int limit = p.limit();
+
+ while (p.hasRemaining())
+ {
+ QpidByteBuffer currentBuffer = result[currentBufferIndex];
+ if (currentBuffer.hasRemaining())
+ {
+ int length = Math.min(p.remaining(), currentBuffer.remaining());
+ p.limit(p.position() + length);
+ currentBuffer.put(p.slice());
+ p.position(p.position() + length);
+ p.limit(limit);
+ }
+
+ if (!currentBuffer.hasRemaining())
+ {
+ currentBuffer.flip();
+ currentBufferIndex++;
+ }
+ }
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
index cb6adbe..6ff5d5f 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/messaging/TransferTest.java
@@ -20,15 +20,20 @@
package org.apache.qpid.tests.protocol.v1_0.messaging;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.junit.Assume.assumeThat;
import java.net.InetSocketAddress;
-import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@@ -38,6 +43,7 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Header;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Received;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected;
import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -45,14 +51,18 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Begin;
import org.apache.qpid.server.protocol.v1_0.type.transport.Close;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition;
+import org.apache.qpid.server.protocol.v1_0.type.transport.End;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Open;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin;
import org.apache.qpid.tests.protocol.v1_0.FrameTransport;
import org.apache.qpid.tests.protocol.v1_0.Interaction;
+import org.apache.qpid.tests.protocol.v1_0.MessageDecoder;
import org.apache.qpid.tests.protocol.v1_0.MessageEncoder;
import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase;
import org.apache.qpid.tests.protocol.v1_0.Response;
@@ -60,6 +70,7 @@ import org.apache.qpid.tests.protocol.v1_0.SpecificationTest;
public class TransferTest extends ProtocolTestBase
{
+ public static final String TEST_MESSAGE_DATA = "foo";
private InetSocketAddress _brokerAddress;
private String _originalMmsMessageStorePersistence;
@@ -109,7 +120,6 @@ public class TransferTest extends ProtocolTestBase
}
}
- @Ignore("QPID-7816")
@Test
@SpecificationTest(section = "2.7.5",
description = "[delivery-tag] MUST be specified for the first transfer "
@@ -118,21 +128,18 @@ public class TransferTest extends ProtocolTestBase
{
try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
{
- Close responseClose = transport.newInteraction()
- .negotiateProtocol().consumeResponse()
- .open().consumeResponse(Open.class)
- .begin().consumeResponse(Begin.class)
- .attachRole(Role.SENDER)
- .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
- .attach().consumeResponse(Attach.class)
- .consumeResponse(Flow.class)
- .transferDeliveryTag(null)
- .transferPayloadData("testData")
- .transfer()
- .consumeResponse()
- .getLatestResponse(Close.class);
- assertThat(responseClose.getError(), is(notNullValue()));
- assertThat(responseClose.getError().getCondition(), equalTo(AmqpError.INVALID_FIELD));
+ Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse(Open.class)
+ .begin().consumeResponse(Begin.class)
+ .attachRole(Role.SENDER)
+ .attachTargetAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse(Attach.class)
+ .consumeResponse(Flow.class)
+ .transferDeliveryTag(null)
+ .transferPayloadData("testData")
+ .transfer();
+ interaction.consumeResponse(Detach.class, End.class, Close.class);
}
}
@@ -237,7 +244,7 @@ public class TransferTest extends ProtocolTestBase
.sync();
final byte[] protocolResponse = interaction.consumeResponse().getLatestResponse(byte[].class);
- assertThat(protocolResponse, is(equalTo("AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8))));
+ assertThat(protocolResponse, is(equalTo("AMQP\0\1\0\0".getBytes(UTF_8))));
interaction.consumeResponse().getLatestResponse(Open.class);
interaction.consumeResponse().getLatestResponse(Begin.class);
@@ -275,7 +282,7 @@ public class TransferTest extends ProtocolTestBase
Rejected.REJECTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .transferPayload(messageEncoder.getPayload())
+ .setPayloadOnTransfer(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
@@ -320,7 +327,7 @@ public class TransferTest extends ProtocolTestBase
.attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL)
.attach().consumeResponse(Attach.class)
.consumeResponse(Flow.class)
- .transferPayload(messageEncoder.getPayload())
+ .setPayloadOnTransfer(messageEncoder.getPayload())
.transferRcvSettleMode(ReceiverSettleMode.FIRST)
.transfer()
.consumeResponse()
@@ -345,4 +352,302 @@ public class TransferTest extends ProtocolTestBase
}
}
}
+
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+ public void receiveTransferUnsettled() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ MessageDecoder messageDecoder = new MessageDecoder();
+ boolean hasMore;
+ do
+ {
+ Transfer responseTransfer = interaction.consumeResponse().getLatestResponse(Transfer.class);
+ messageDecoder.addTransfer(responseTransfer);
+ hasMore = Boolean.TRUE.equals(responseTransfer.getMore());
+ }
+ while (hasMore);
+
+ Object data = messageDecoder.getData();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+ public void receiveTransferReceiverSettleFirst() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+ interaction.dispositionSettled(true)
+ .dispositionRole(Role.RECEIVER)
+ .disposition();
+
+ // verify that no unexpected performative is received by closing
+ transport.doCloseConnection();
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+ public void receiveTransferReceiverSettleSecond() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+ Disposition disposition = interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Accepted())
+ .disposition()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ assertThat(disposition.getSettled(), is(true));
+
+ interaction.consumeResponse(null, Flow.class);
+
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+ public void receiveTransferReceiverSettleSecondWithRejectedOutcome() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachSourceOutcomes(Accepted.ACCEPTED_SYMBOL, Rejected.REJECTED_SYMBOL)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ Object data = interaction.receiveDelivery().decodeLatestDelivery().getDecodedLatestDelivery();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+ interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Rejected())
+ .disposition()
+ .consumeResponse(Disposition.class, Flow.class);
+ Response<?> response = interaction.getLatestResponse();
+ if (response.getBody() instanceof Flow)
+ {
+ interaction.consumeResponse(Disposition.class);
+ }
+
+ Disposition disposition = interaction.getLatestResponse(Disposition.class);
+ assertThat(disposition.getSettled(), is(true));
+
+ interaction.consumeResponse(null, Flow.class);
+
+ }
+ }
+
+ @Ignore
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "Transferring A Message.")
+ public void receiveTransferReceiverSettleSecondWithImplicitDispositionState() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attachSourceOutcomes()
+ .attachSourceDefaultOutcome(null)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+ Disposition disposition = interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(null)
+ .disposition()
+ .consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ assertThat(disposition.getSettled(), is(true));
+
+ interaction.consumeResponse(null, Flow.class);
+
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.6.12", description = "[...] the receiving application MAY wish to indicate"
+ + " non-terminal delivery states to the sender")
+ public void receiveTransferReceiverIndicateNonTerminalDeliveryState() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.SECOND)
+ .attach().consumeResponse()
+ .flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow()
+ .receiveDelivery()
+ .decodeLatestDelivery();
+
+ Object data = interaction.getDecodedLatestDelivery();
+ assertThat(data, Is.is(CoreMatchers.equalTo(TEST_MESSAGE_DATA)));
+
+ interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Received())
+ .disposition();
+
+ Disposition disposition = interaction.dispositionSettled(false)
+ .dispositionRole(Role.RECEIVER)
+ .dispositionState(new Accepted())
+ .disposition().consumeResponse(Disposition.class)
+ .getLatestResponse(Disposition.class);
+ assertThat(disposition.getSettled(), is(true));
+
+ interaction.consumeResponse(null, Flow.class);
+ }
+ }
+
+
+ @Test
+ @SpecificationTest(section = "2.7.3", description = "The sender SHOULD respect the receiver’s desired settlement mode if"
+ + "the receiver initiates the attach exchange and the sender supports the desired mode.")
+ public void receiveTransferSenderSettleModeSettled() throws Exception
+ {
+ getBrokerAdmin().putMessageOnQueue(BrokerAdmin.TEST_QUEUE_NAME, TEST_MESSAGE_DATA);
+
+ try (FrameTransport transport = new FrameTransport(_brokerAddress).connect())
+ {
+ final Interaction interaction = transport.newInteraction()
+ .negotiateProtocol().consumeResponse()
+ .open().consumeResponse()
+ .begin().consumeResponse()
+ .attachRole(Role.RECEIVER)
+ .attachSourceAddress(BrokerAdmin.TEST_QUEUE_NAME)
+ .attachRcvSettleMode(ReceiverSettleMode.FIRST)
+ .attachSndSettleMode(SenderSettleMode.SETTLED)
+ .attach().consumeResponse(Attach.class);
+ Attach attach = interaction.getLatestResponse(Attach.class);
+ assumeThat(attach.getSndSettleMode(), is(equalTo(SenderSettleMode.SETTLED)));
+
+ interaction.flowIncomingWindow(UnsignedInteger.ONE)
+ .flowNextIncomingId(UnsignedInteger.ZERO)
+ .flowOutgoingWindow(UnsignedInteger.ZERO)
+ .flowNextOutgoingId(UnsignedInteger.ZERO)
+ .flowLinkCredit(UnsignedInteger.ONE)
+ .flowHandleFromLinkHandle()
+ .flow();
+
+ List<Transfer> transfers = interaction.receiveDelivery().getLatestDelivery();
+ final AtomicBoolean isSettled = new AtomicBoolean();
+ transfers.forEach(transfer -> { if (Boolean.TRUE.equals(transfer.getSettled())) { isSettled.set(true);}});
+
+ assertThat(isSettled.get(), is(true));
+
+ // verify no unexpected performative received by closing the connection
+ transport.doCloseConnection();
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
index e2f8b5e..f63d85e 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transaction/DischargeTest.java
@@ -143,7 +143,7 @@ public class DischargeTest extends ProtocolTestBase
final Detach detachResponse = interaction.transferDeliveryId(UnsignedInteger.ONE)
.transferDeliveryTag(new Binary("discharge".getBytes(UTF_8)))
.transferPayloadData(discharge)
- .transfer().consumeResponse()
+ .transfer().consumeResponse(Detach.class)
.getLatestResponse(Detach.class);
Error error = detachResponse.getError();
assertThat(error, is(notNullValue()));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org