You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/28 15:47:59 UTC
[3/3] qpid-broker-j git commit: QPID-7842 : [AMQP 1.0] Refactor
transfer functionality
QPID-7842 : [AMQP 1.0] Refactor transfer functionality
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/2e8efc0a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/2e8efc0a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/2e8efc0a
Branch: refs/heads/master
Commit: 2e8efc0a9cdc25bdd52589866f13f3e1b99bb8f0
Parents: d604344
Author: Alex Rudyy <or...@apache.org>
Authored: Thu Jun 22 15:25:01 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Wed Jun 28 16:14:05 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 62 ++-
.../v1_0/AbstractReceivingLinkEndpoint.java | 256 ++++++----
.../protocol/v1_0/ConsumerTarget_1_0.java | 6 +-
.../qpid/server/protocol/v1_0/Delivery.java | 144 +++++-
.../protocol/v1_0/ErrantLinkEndpoint.java | 12 +-
.../qpid/server/protocol/v1_0/LinkEndpoint.java | 3 +-
.../protocol/v1_0/SendingLinkEndpoint.java | 197 ++++----
.../qpid/server/protocol/v1_0/Session_1_0.java | 155 +++---
.../v1_0/StandardReceivingLinkEndpoint.java | 162 +-----
.../TxnCoordinatorReceivingLinkEndpoint.java | 60 +--
.../protocol/v1_0/codec/ValueHandler.java | 2 +-
.../v1_0/delivery/DeliveryRegistry.java | 34 ++
.../v1_0/delivery/DeliveryRegistryImpl.java | 79 +++
.../v1_0/delivery/UnsettledDelivery.java | 46 ++
.../qpid/server/protocol/v1_0/type/Outcome.java | 2 +-
.../protocol/v1_0/type/transport/Attach.java | 12 +-
.../protocol/v1_0/type/transport/Begin.java | 6 +-
.../v1_0/type/transport/Disposition.java | 7 +
.../protocol/v1_0/type/transport/Flow.java | 7 +-
.../qpid/tests/protocol/v1_0/BrokerAdmin.java | 4 +
.../v1_0/EmbeddedBrokerPerClassAdminImpl.java | 14 +
.../v1_0/ExternalQpidBrokerAdminImpl.java | 12 +
.../qpid/tests/protocol/v1_0/Interaction.java | 261 +++++++++-
.../v1_0/InteractionTransactionalState.java | 50 ++
.../apache/qpid/tests/protocol/v1_0/Utils.java | 17 +-
.../v1_0/messaging/MultiTransferTest.java | 413 +++++++++++++++
.../protocol/v1_0/messaging/TransferTest.java | 345 ++++++++++++-
.../v1_0/transaction/DischargeTest.java | 2 +-
.../transaction/TransactionalTransferTest.java | 499 +++++++++++++++++++
29 files changed, 2272 insertions(+), 597 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 00f8724..a02a297 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -38,7 +38,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
@@ -52,24 +51,22 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractLinkEndpoint.class);
private final Link_1_0<S, T> _link;
private final Session_1_0 _session;
+
+ // todo: remove client specific part
private Object _flowTransactionId;
- private SenderSettleMode _sendingSettlementMode;
- private ReceiverSettleMode _receivingSettlementMode;
- private Map _initialUnsettledMap;
- private UnsignedInteger _lastSentCreditLimit;
+ private volatile SenderSettleMode _sendingSettlementMode;
+ private volatile ReceiverSettleMode _receivingSettlementMode;
+ private volatile UnsignedInteger _lastSentCreditLimit;
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
- private Symbol[] _capabilities;
- private SequenceNumber _deliveryCount;
- private UnsignedInteger _linkCredit;
- private UnsignedInteger _available;
- private Boolean _drain;
- private UnsignedInteger _localHandle;
- private UnsignedLong _maxMessageSize;
- private Map<Symbol, Object> _properties;
-
- protected volatile State _state = State.ATTACH_RECVD;
- protected Map _localUnsettled;
+ private volatile Symbol[] _capabilities;
+ private volatile SequenceNumber _deliveryCount;
+ private volatile UnsignedInteger _linkCredit;
+ private volatile UnsignedInteger _available;
+ private volatile Boolean _drain;
+ private volatile UnsignedInteger _localHandle;
+ private volatile Map<Symbol, Object> _properties;
+ private volatile State _state = State.ATTACH_RECVD;
protected enum State
{
@@ -88,12 +85,13 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_link = link;
}
- protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+ protected abstract void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
protected abstract void remoteDetachedPerformDetach(final Detach detach);
protected abstract Map<Symbol,Object> initProperties(final Attach attach);
+ protected abstract Map<Binary, DeliveryState> getLocalUnsettled();
@Override
public void receiveAttach(final Attach attach) throws AmqpErrorException
@@ -131,9 +129,17 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
{
_sendingSettlementMode = attach.getSndSettleMode();
_receivingSettlementMode = attach.getRcvSettleMode();
- _initialUnsettledMap = attach.getUnsettled();
_properties = initProperties(attach);
_state = State.ATTACH_RECVD;
+
+ if (getRole() == Role.RECEIVER)
+ {
+ getSession().getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+ }
+ else
+ {
+ getSession().getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(this);
+ }
}
public boolean isStopped()
@@ -229,20 +235,16 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
}
- public void addUnsettled(final Delivery unsettled)
- {
- }
-
@Override
- public void receiveDeliveryState(final Delivery unsettled,
+ public void receiveDeliveryState(final Binary deliveryTag,
final DeliveryState state,
final Boolean settled)
{
- handle(unsettled.getDeliveryTag(), state, settled);
+ handleDeliveryState(deliveryTag, state, settled);
if (Boolean.TRUE.equals(settled))
{
- settle(unsettled.getDeliveryTag());
+ settle(deliveryTag);
}
}
@@ -297,7 +299,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
attachToSend.setTarget(getTarget());
attachToSend.setSndSettleMode(getSendingSettlementMode());
attachToSend.setRcvSettleMode(getReceivingSettlementMode());
- attachToSend.setUnsettled(_localUnsettled);
+ attachToSend.setUnsettled(getLocalUnsettled());
attachToSend.setProperties(_properties);
attachToSend.setOfferedCapabilities(_capabilities);
@@ -496,13 +498,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_capabilities = capabilities == null ? null : capabilities.toArray(new Symbol[capabilities.size()]);
}
- public Map getInitialUnsettledMap()
- {
- return _initialUnsettledMap;
- }
-
- public abstract void initialiseUnsettled();
-
@Override public String toString()
{
return "LinkEndpoint{" +
@@ -517,7 +512,6 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
", _available=" + _available +
", _drain=" + _drain +
", _localHandle=" + _localHandle +
- ", _maxMessageSize=" + _maxMessageSize +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index 4b55186..d159de5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
@@ -35,48 +36,21 @@ import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extends AbstractLinkEndpoint<Source, T>
{
private final SectionDecoder _sectionDecoder;
- private UnsignedInteger _lastDeliveryId;
- private Map<Binary, Object> _unsettledMap = new LinkedHashMap<>();
- private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<>();
- private boolean _creditWindow;
-
-
- private static class TransientState
- {
-
- UnsignedInteger _deliveryId;
- boolean _settled;
-
- private TransientState(final UnsignedInteger transferId)
- {
- _deliveryId = transferId;
- }
-
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
- }
-
- public boolean isSettled()
- {
- return _settled;
- }
-
- public void setSettled(boolean settled)
- {
- _settled = settled;
- }
- }
+ final Map<Binary, DeliveryState> _unsettled = Collections.synchronizedMap(new LinkedHashMap<>());
+ private volatile boolean _creditWindow;
+ private volatile Delivery _currentDelivery;
public AbstractReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, T> link)
{
@@ -98,73 +72,162 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
return Role.RECEIVER;
}
- Error receiveTransfer(final Transfer transfer, final Delivery delivery)
+ void receiveTransfer(final Transfer transfer)
{
if(isAttached())
{
- TransientState transientState;
- final Binary deliveryTag = delivery.getDeliveryTag();
- boolean existingState = _unsettledMap.containsKey(deliveryTag);
- if (!existingState || transfer.getState() != null)
+ if (!ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
+ && ReceiverSettleMode.SECOND.equals(transfer.getRcvSettleMode()))
{
- _unsettledMap.put(deliveryTag, transfer.getState());
+ Error error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
+ close(error);
+ return;
}
- if (!existingState)
+
+ if (_currentDelivery == null)
{
- transientState = new TransientState(transfer.getDeliveryId());
- if (delivery.isSettled())
+ Error error = validateNewTransfer(transfer);
+ if (error != null)
{
- transientState.setSettled(true);
+ close(error);
+ return;
}
- _unsettledIds.put(deliveryTag, transientState);
+ _currentDelivery = new Delivery(transfer, this);
+
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
getDeliveryCount().incr();
+
+ getSession().getIncomingDeliveryRegistry()
+ .addDelivery(transfer.getDeliveryId(),
+ new UnsettledDelivery(transfer.getDeliveryTag(), this));
}
else
{
- transientState = _unsettledIds.get(deliveryTag);
- if (delivery.isSettled())
+ Error error = validateSubsequentTransfer(transfer);
+ if (error != null)
{
- transientState.setSettled(true);
+ close(error);
+ return;
}
+ _currentDelivery.addTransfer(transfer);
+ }
+
+ if (!_currentDelivery.getResume())
+ {
+ _unsettled.put(_currentDelivery.getDeliveryTag(), _currentDelivery.getState());
+ }
+ else if (!_unsettled.containsKey(_currentDelivery.getDeliveryTag()))
+ {
+ final Error error = new Error(AmqpError.ILLEGAL_STATE,
+ String.format("Resumed transfer with delivery tag '%s' is not found.",
+ _currentDelivery.getDeliveryTag()));
+ close(error);
+ return;
}
- if (transientState.isSettled() && delivery.isComplete())
+ if (_currentDelivery.isAborted())
{
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(_currentDelivery.getDeliveryTag());
+ getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+ _currentDelivery = null;
+
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ getDeliveryCount().decr();
+ }
+ else if (_currentDelivery.isComplete())
+ {
+ try
+ {
+ if (_currentDelivery.isSettled())
+ {
+ _unsettled.remove(_currentDelivery.getDeliveryTag());
+ getSession().getIncomingDeliveryRegistry().removeDelivery(_currentDelivery.getDeliveryId());
+ }
+ Error error = receiveDelivery(_currentDelivery);
+ if (error != null)
+ {
+ close(error);
+ }
+ }
+ finally
+ {
+ _currentDelivery = null;
+ }
}
- return messageTransfer(transfer);
}
else
{
+ // TODO: it is wrong
getSession().updateDisposition(Role.RECEIVER, transfer.getDeliveryId(), transfer.getDeliveryId(),null, true);
- return null;
}
}
- protected abstract Error messageTransfer(final Transfer transfer);
-
- @Override public void receiveFlow(final Flow flow)
+ private Error validateNewTransfer(final Transfer transfer)
{
- setAvailable(flow.getAvailable());
- setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+ Error error = null;
+ if (transfer.getDeliveryId() == null)
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"delivery-id\" is required for a new delivery.");
+ }
+ else if (transfer.getDeliveryTag() == null)
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"delivery-tag\" is required for a new delivery.");
+ }
+ return error;
}
- public boolean settled(final Binary deliveryTag)
+ private Error validateSubsequentTransfer(final Transfer transfer)
{
- boolean deleted;
- if (deleted = (_unsettledIds.remove(deliveryTag) != null))
+ Error error = null;
+ if (transfer.getDeliveryId() != null && !_currentDelivery.getDeliveryId()
+ .equals(transfer.getDeliveryId()))
{
- _unsettledMap.remove(deliveryTag);
-
+ error = new Error(AmqpError.INVALID_FIELD,
+ String.format(
+ "Unexpected transfer \"delivery-id\" for multi-transfer delivery: found '%s', expected '%s'.",
+ transfer.getDeliveryId(),
+ _currentDelivery.getDeliveryId()));
}
+ else if (transfer.getDeliveryTag() != null && !_currentDelivery.getDeliveryTag()
+ .equals(transfer.getDeliveryTag()))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ String.format(
+ "Unexpected transfer \"delivery-tag\" for multi-transfer delivery: found '%s', expected '%s'.",
+ transfer.getDeliveryTag(),
+ _currentDelivery.getDeliveryTag()));
+ }
+ else if (_currentDelivery.getReceiverSettleMode() != null && transfer.getRcvSettleMode() != null
+ && !_currentDelivery.getReceiverSettleMode().equals(transfer.getRcvSettleMode()))
+ {
+ error = new Error(AmqpError.INVALID_FIELD,
+ "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
+ }
+ return error;
+ }
+
+ protected abstract Error receiveDelivery(final Delivery delivery);
- return deleted;
+ @Override
+ public void receiveFlow(final Flow flow)
+ {
+ setAvailable(flow.getAvailable());
+ setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
+ }
+
+ private boolean settled(final Binary deliveryTag)
+ {
+ return _unsettled.remove(deliveryTag) != null;
}
- public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
+ public void updateDisposition(final Binary deliveryTag,
+ final DeliveryState state,
+ final boolean settled)
{
- if (_unsettledMap.containsKey(deliveryTag))
+ if (_unsettled.containsKey(deliveryTag))
{
boolean outcomeUpdate = false;
Outcome outcome = null;
@@ -174,27 +237,23 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
else if (state instanceof TransactionalState)
{
- // TODO? Is this correct
outcome = ((TransactionalState) state).getOutcome();
}
if (outcome != null)
{
- Object oldOutcome = _unsettledMap.put(deliveryTag, outcome);
- outcomeUpdate = !outcome.equals(oldOutcome);
+ if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+ {
+ Object oldOutcome = _unsettled.put(deliveryTag, outcome);
+ outcomeUpdate = !outcome.equals(oldOutcome);
+ }
}
-
- TransientState transientState = _unsettledIds.get(deliveryTag);
if (outcomeUpdate || settled)
{
-
- final UnsignedInteger transferId = transientState.getDeliveryId();
-
- getSession().updateDisposition(getRole(), transferId, transferId, state, settled);
+ getSession().updateDisposition(getRole(), deliveryTag, state, settled);
}
-
if (settled)
{
@@ -212,33 +271,28 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
}
}
- else
+ else if (_creditWindow)
{
- TransientState transientState = _unsettledIds.get(deliveryTag);
- if (_creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
-
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
+ sendFlowConditional();
}
}
- public void setCreditWindow()
+ void setCreditWindow()
{
setCreditWindow(true);
}
- public void setCreditWindow(boolean window)
+ private void setCreditWindow(boolean window)
{
_creditWindow = window;
sendFlowConditional();
}
@Override
- public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
+ public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- super.receiveDeliveryState(unsettled, state, settled);
+ super.receiveDeliveryState(deliveryTag, state, settled);
if(_creditWindow)
{
if(Boolean.TRUE.equals(settled))
@@ -258,8 +312,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
public void settle(Binary deliveryTag)
{
super.settle(deliveryTag);
- _unsettledIds.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(deliveryTag);
if(_creditWindow)
{
sendFlowConditional();
@@ -271,15 +324,32 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
{
}
- UnsignedInteger getLastDeliveryId()
+ @Override
+ protected void detach(final Error error, final boolean close)
{
- return _lastDeliveryId;
+ try
+ {
+ super.detach(error, close);
+ }
+ finally
+ {
+ if (close)
+ {
+ if (_currentDelivery != null)
+ {
+ _currentDelivery.discard();
+ _currentDelivery = null;
+ }
+ }
+ }
}
- void setLastDeliveryId(UnsignedInteger lastDeliveryId)
+ @Override
+ protected void handleDeliveryState(Binary deliveryTag, DeliveryState state, Boolean settled)
{
- _lastDeliveryId = lastDeliveryId;
+ if(Boolean.TRUE.equals(settled))
+ {
+ _unsettled.remove(deliveryTag);
+ }
}
-
-
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index a535fd2..3942ca8 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -79,7 +79,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
- private SendingLinkEndpoint _linkEndpoint;
+ private final SendingLinkEndpoint _linkEndpoint;
private final SectionEncoder _sectionEncoder;
private final StateChangeListener<MessageInstance, MessageInstance.EntryState> _unacknowledgedMessageListener = new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
@@ -356,7 +356,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
{
updateNotifyWorkDesired();
- if (isSuspended() && getEndpoint() != null)
+ if (_linkEndpoint != null)
{
_transactionId = _linkEndpoint.getTransactionId();
}
@@ -454,7 +454,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
else
{
- _linkEndpoint.updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+ _linkEndpoint.updateDisposition(_deliveryTag, outcome, true);
}
_linkEndpoint.sendFlowConditional();
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
index a5fb4bc..8facf8f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
@@ -19,38 +19,57 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
public class Delivery
{
private final UnsignedInteger _deliveryId;
private final Binary _deliveryTag;
+ private final List<Transfer> _transfers = new CopyOnWriteArrayList<>();
private final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> _linkEndpoint;
- private boolean _complete;
- private boolean _settled;
- private int _numberOfTransfers = 0;
+ private final UnsignedInteger _messageFormat;
+ private volatile boolean _complete;
+ private volatile boolean _settled;
+ private volatile boolean _aborted;
+ private volatile DeliveryState _state;
+ private volatile ReceiverSettleMode _receiverSettleMode;
+ private volatile boolean _resume;
public Delivery(Transfer transfer, final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint)
{
- _settled = Boolean.TRUE.equals(transfer.getSettled());
_deliveryId = transfer.getDeliveryId();
_deliveryTag = transfer.getDeliveryTag();
_linkEndpoint = endpoint;
+ _messageFormat = transfer.getMessageFormat();
addTransfer(transfer);
}
- public boolean isComplete()
+ public UnsignedInteger getDeliveryId()
{
- return _complete;
+ return _deliveryId;
+ }
+
+ public Binary getDeliveryTag()
+ {
+ return _deliveryTag;
}
- public void setComplete(final boolean complete)
+ public boolean isComplete()
{
- _complete = complete;
+ return _complete;
}
public boolean isSettled()
@@ -58,27 +77,93 @@ public class Delivery
return _settled;
}
- public void setSettled(final boolean settled)
+ public boolean isAborted()
{
- _settled = settled;
+ return _aborted;
}
- public final void addTransfer(Transfer transfer)
+ public DeliveryState getState()
{
- _numberOfTransfers++;
- if(Boolean.TRUE.equals(transfer.getAborted()) || !Boolean.TRUE.equals(transfer.getMore()))
+ return _state;
+ }
+
+ public ReceiverSettleMode getReceiverSettleMode()
+ {
+ return _receiverSettleMode;
+ }
+
+ public UnsignedInteger getMessageFormat()
+ {
+ return _messageFormat;
+ }
+
+
+ public boolean getResume()
+ {
+ return _resume;
+ }
+
+ final void addTransfer(Transfer transfer)
+ {
+ if (_aborted)
+ {
+ throw new IllegalStateException(String.format("Delivery '%s/%d' is already aborted",
+ _deliveryTag,
+ _deliveryId.intValue()));
+ }
+
+ if (_complete)
+ {
+ throw new IllegalStateException(String.format("Delivery '%s/%d' is already completed",
+ _deliveryTag,
+ _deliveryId.intValue()));
+ }
+
+ _transfers.add(transfer);
+ if (Boolean.TRUE.equals(transfer.getAborted()))
{
- setComplete(true);
+ _aborted = true;
+ discard();
+ }
+ if(!Boolean.TRUE.equals(transfer.getMore()))
+ {
+ _complete = true;
}
if(Boolean.TRUE.equals(transfer.getSettled()))
{
- setSettled(true);
+ _settled = true;
}
- }
- public UnsignedInteger getDeliveryId()
- {
- return _deliveryId;
+ if(Boolean.TRUE.equals(transfer.getResume()))
+ {
+ _resume = true;
+ }
+
+ if (transfer.getState() != null)
+ {
+ DeliveryState currentState;
+ if (_state instanceof TransactionalState)
+ {
+ currentState = ((TransactionalState) _state).getOutcome();
+ }
+ else
+ {
+ currentState = _state;
+ }
+ if (!(currentState instanceof Outcome))
+ {
+ _state = transfer.getState();
+ }
+ }
+
+ if (transfer.getRcvSettleMode() != null)
+ {
+ if (_receiverSettleMode == null)
+ {
+ _receiverSettleMode = transfer.getRcvSettleMode();
+ }
+
+ }
}
public LinkEndpoint<? extends BaseSource, ? extends BaseTarget> getLinkEndpoint()
@@ -86,13 +171,26 @@ public class Delivery
return _linkEndpoint;
}
- public Binary getDeliveryTag()
+
+ public List<QpidByteBuffer> getPayload()
{
- return _deliveryTag;
+ List<QpidByteBuffer> fragments = new ArrayList<>(_transfers.size());
+ for (Transfer t : _transfers)
+ {
+ fragments.addAll(t.getPayload());
+ t.dispose();
+ }
+ _transfers.clear();
+ return fragments;
}
- public int getNumberOfTransfers()
+ public void discard()
{
- return _numberOfTransfers;
+ for (Transfer transfer: _transfers)
+ {
+ transfer.dispose();
+ }
+ _transfers.clear();
}
+
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index 0d9daa7..287f038 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -144,6 +145,12 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
}
@Override
+ public void receiveDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ {
+
+ }
+
+ @Override
public void receiveFlow(final Flow flow)
{
throw new UnsupportedOperationException("This Link is errant");
@@ -173,9 +180,4 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
throw new UnsupportedOperationException("This Link is errant");
}
- @Override
- public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
- {
- throw new UnsupportedOperationException("This Link is errant");
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index 61a0199..94e3b1e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
@@ -52,7 +53,7 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
void remoteDetached(Detach detach);
- void receiveDeliveryState(Delivery unsettled,
+ void receiveDeliveryState(Binary deliveryTag,
DeliveryState state,
Boolean settled);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 78b6290..95dc34b 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -30,7 +30,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,7 +53,6 @@ import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.DeleteOnNoMessages;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
@@ -80,23 +78,20 @@ import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
+ private static final Symbol PRIORITY = Symbol.valueOf("priority");
- public static final Symbol PRIORITY = Symbol.valueOf("priority");
- private UnsignedInteger _lastDeliveryId;
- private Binary _lastDeliveryTag;
- private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
- private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
- private Binary _transactionId;
- private Integer _priority;
private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+ private final Map<Binary, OutgoingDelivery> _unsettled = new ConcurrentHashMap<>();
+
+ private volatile Binary _transactionId;
+ private volatile Integer _priority;
private volatile boolean _draining = false;
- private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
- private SendingDestination _destination;
- private EnumSet<ConsumerOption> _consumerOptions;
- private FilterManager _consumerFilters;
- private ConsumerTarget_1_0 _consumerTarget;
- private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
+ private volatile SendingDestination _destination;
+ private volatile EnumSet<ConsumerOption> _consumerOptions;
+ private volatile FilterManager _consumerFilters;
+ private volatile ConsumerTarget_1_0 _consumerTarget;
+ private volatile MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
{
@@ -111,7 +106,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
}
- public void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
+ private void prepareConsumerOptionsAndFilters(final SendingDestination destination) throws AmqpErrorException
{
// TODO FIXME: this method might modify the source. this is not good encapsulation. furthermore if it does so then it should inform the link/linkregistry about it!
_destination = destination;
@@ -199,7 +194,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
_consumerFilters = filters;
}
- void createConsumerTarget() throws AmqpErrorException
+ private void createConsumerTarget() throws AmqpErrorException
{
final Source source = getSource();
_consumerTarget = new ConsumerTarget_1_0(this,
@@ -338,7 +333,6 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
attachReceived(attach);
- initialiseUnsettled();
}
@Override
@@ -372,17 +366,12 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
return Role.SENDER;
}
- public Integer getPriority()
+ private Integer getPriority()
{
return _priority;
}
- public TerminusDurability getTerminusDurability()
- {
- return getSource().getDurable();
- }
-
- public boolean transfer(final Transfer xfr, final boolean decrementCredit)
+ void transfer(final Transfer xfr, final boolean decrementCredit)
{
Session_1_0 s = getSession();
xfr.setMessageFormat(UnsignedInteger.ZERO);
@@ -395,27 +384,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
xfr.setHandle(getLocalHandle());
- s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag));
-
- if(!Boolean.TRUE.equals(xfr.getSettled()))
- {
- _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId());
- }
-
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- _lastDeliveryTag = xfr.getDeliveryTag();
- }
- else
- {
- _lastDeliveryTag = null;
- }
-
- return true;
+ s.sendTransfer(xfr, this, true);
}
- public boolean drained()
+ boolean drained()
{
if (_draining)
{
@@ -514,11 +487,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
Modified state = new Modified();
state.setDeliveryFailed(true);
- for (UnsettledAction action : _unsettledActionMap.values())
+ for (OutgoingDelivery delivery : _unsettled.values())
{
- action.process(state, Boolean.TRUE);
+ UnsettledAction action = delivery.getAction();
+ if (action != null)
+ {
+ action.process(state, Boolean.TRUE);
+ delivery.setAction(null);
+ }
}
- _unsettledActionMap.clear();
Error closingError = null;
if (getDestination() instanceof ExchangeDestination
@@ -558,20 +535,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
}
- public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
+ void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance messageInstance)
{
- _unsettledActionMap.put(tag, unsettledAction);
- if(getTransactionId() == null)
- {
- _unsettledMap2.put(tag, queueEntry);
- }
-
+ _unsettled.put(tag, new OutgoingDelivery(messageInstance, unsettledAction, null));
}
@Override
- protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
+ protected void handleDeliveryState(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+ UnsettledAction action = _unsettled.get(deliveryTag).getAction();
boolean localSettle = false;
if(action != null)
{
@@ -583,9 +555,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
if(Boolean.TRUE.equals(settled) || localSettle)
{
- _unsettledActionMap.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
- _unsettledMap2.remove(deliveryTag);
+ _unsettled.remove(deliveryTag);
}
}
@@ -602,23 +572,11 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
&& getSession().hasCreditToSend();
}
- public UnsignedInteger getLastDeliveryId()
- {
- return _lastDeliveryId;
- }
-
- public void setLastDeliveryId(final UnsignedInteger deliveryId)
- {
- _lastDeliveryId = deliveryId;
- }
-
public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
{
- UnsignedInteger deliveryId;
- if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
+ if (settled && (_unsettled.remove(deliveryTag) != null))
{
- _unsettledMap2.remove(deliveryTag);
- getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
+ getSession().updateDisposition(getRole(), deliveryTag, state, settled);
}
}
@@ -677,33 +635,34 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
getLink().setTarget(target);
- final MessageInstanceConsumer consumer = getConsumer();
+ final MessageInstanceConsumer oldConsumer = getConsumer();
createConsumerTarget();
_resumeAcceptedTransfers.clear();
_resumeFullTransfers.clear();
final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
- Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
- Map initialUnsettledMap = getInitialUnsettledMap();
+ Map<Binary, OutgoingDelivery> unsettledCopy = new HashMap<>(_unsettled);
+ Map<Binary, DeliveryState> remoteUnsettled =
+ attach.getUnsettled() == null ? Collections.emptyMap() : new HashMap<>(attach.getUnsettled());
- for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+ for (Map.Entry<Binary, OutgoingDelivery> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- final MessageInstance queueEntry = entry.getValue();
- if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+ final MessageInstance queueEntry = entry.getValue().getMessageInstance();
+ if (remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
{
queueEntry.setRedelivered();
- queueEntry.release(consumer);
- _unsettledMap2.remove(deliveryTag);
+ queueEntry.release(oldConsumer);
+ _unsettled.remove(deliveryTag);
}
- else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+ else if (remoteUnsettled.get(deliveryTag) instanceof Outcome)
{
- Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+ Outcome outcome = (Outcome) remoteUnsettled.get(deliveryTag);
if (outcome instanceof Accepted)
{
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
+ if (oldConsumer.acquires())
{
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
if (queueEntry.acquire() || queueEntry.isAcquired())
{
txn.dequeue(Collections.singleton(queueEntry),
@@ -723,15 +682,15 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
else if (outcome instanceof Released)
{
- AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
- if (consumer.acquires())
+ if (oldConsumer.acquires())
{
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
txn.dequeue(Collections.singleton(queueEntry),
new ServerTransaction.Action()
{
public void postCommit()
{
- queueEntry.release(consumer);
+ queueEntry.release(oldConsumer);
}
public void onRollback()
@@ -740,14 +699,17 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
});
}
}
- //_unsettledMap.remove(deliveryTag);
- initialUnsettledMap.remove(deliveryTag);
+
+ // TODO: Handle rejected and modified outcome
+
+ remoteUnsettled.remove(deliveryTag);
_resumeAcceptedTransfers.add(deliveryTag);
}
else
{
_resumeFullTransfers.add(queueEntry);
- // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+
+ // TODO: exists in receivers map, but not yet got an outcome ... should resend with resume = true
}
}
@@ -755,22 +717,22 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
}
@Override
- public void initialiseUnsettled()
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
{
- Map<Binary, MessageInstance> _localUnsettled = new HashMap<>(_unsettledMap2);
-
- for (Map.Entry<Binary, MessageInstance> entry : _localUnsettled.entrySet())
+ Map<Binary, DeliveryState> unsettled = new HashMap<>();
+ for (Map.Entry<Binary, OutgoingDelivery> entry : _unsettled.entrySet())
{
- entry.setValue(null);
+ unsettled.put(entry.getKey(), entry.getValue().getLocalState());
}
+ return unsettled;
}
- public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
+ private MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
{
return _consumer;
}
- public ConsumerTarget_1_0 getConsumerTarget()
+ ConsumerTarget_1_0 getConsumerTarget()
{
return _consumerTarget;
}
@@ -784,4 +746,45 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
_destination = destination;
}
+
+ private static class OutgoingDelivery
+ {
+ private final MessageInstance _messageInstance;
+ private volatile UnsettledAction _action;
+ private volatile DeliveryState _localState;
+
+ public OutgoingDelivery(final MessageInstance messageInstance,
+ final UnsettledAction action,
+ final DeliveryState localState)
+ {
+ _messageInstance = messageInstance;
+ _action = action;
+ _localState = localState;
+ }
+
+ public MessageInstance getMessageInstance()
+ {
+ return _messageInstance;
+ }
+
+ public UnsettledAction getAction()
+ {
+ return _action;
+ }
+
+ public DeliveryState getLocalState()
+ {
+ return _localState;
+ }
+
+ public void setLocalState(final DeliveryState localState)
+ {
+ _localState = localState;
+ }
+
+ public void setAction(final UnsettledAction action)
+ {
+ _action = action;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 29f1a01..65a5265 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -35,7 +35,6 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -72,6 +71,9 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistry;
+import org.apache.qpid.server.protocol.v1_0.delivery.DeliveryRegistryImpl;
+import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
@@ -155,9 +157,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
- private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
- private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
-
+ private final DeliveryRegistry _outgoingDeliveryRegistry = new DeliveryRegistryImpl();
+ private final DeliveryRegistry _incomingDeliveryRegistry = new DeliveryRegistryImpl();
private final Error _sessionEndedLinkError =
new Error(LinkError.DETACH_FORCED,
@@ -231,7 +232,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
}
- public void updateDisposition(final Role role,
+ void updateDisposition(final Role role,
final UnsignedInteger first,
final UnsignedInteger last,
final DeliveryState state, final boolean settled)
@@ -248,13 +249,13 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (settled)
{
- final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
- role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
+ final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+
SequenceNumber pos = new SequenceNumber(first.intValue());
SequenceNumber end = new SequenceNumber(last.intValue());
while (pos.compareTo(end) <= 0)
{
- unsettled.remove(UnsignedInteger.valueOf(pos.intValue()));
+ deliveryRegistry.removeDelivery(UnsignedInteger.valueOf(pos.intValue()));
pos.incr();
}
}
@@ -263,6 +264,21 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
//TODO - check send flow
}
+ void updateDisposition(final Role role,
+ final Binary deliveryTag,
+ final DeliveryState state,
+ final boolean settled)
+ {
+ final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+ UnsignedInteger deliveryId = deliveryRegistry.getDeliveryIdByTag(deliveryTag);
+ if (deliveryId == null)
+ {
+ throw new ConnectionScopedRuntimeException(String.format(
+ "Delivery with tag '%s' is not found in unsettled deliveries", deliveryTag));
+ }
+ updateDisposition(role, deliveryId, deliveryId, state, settled);
+ }
+
public boolean hasCreditToSend()
{
boolean b = _remoteIncomingWindow > 0;
@@ -283,32 +299,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (newDelivery)
{
deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
- endpoint.setLastDeliveryId(deliveryId);
+ xfr.setDeliveryId(deliveryId);
if (!settled)
{
- final Delivery delivery = new Delivery(xfr, endpoint);
- _outgoingUnsettled.put(deliveryId, delivery);
- endpoint.addUnsettled(delivery);
- }
- }
- else
- {
- deliveryId = endpoint.getLastDeliveryId();
- final Delivery delivery = _outgoingUnsettled.get(deliveryId);
- if (delivery != null)
- {
- if (!settled)
- {
- delivery.addTransfer(xfr);
- }
- else
- {
- endpoint.settle(delivery.getDeliveryTag());
- _outgoingUnsettled.remove(deliveryId);
- }
+ final UnsettledDelivery delivery = new UnsettledDelivery(xfr.getDeliveryTag(), endpoint);
+ _outgoingDeliveryRegistry.addDelivery(deliveryId, delivery);
}
}
- xfr.setDeliveryId(deliveryId);
+
_remoteIncomingWindow--;
try
{
@@ -461,42 +459,40 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
Role dispositionRole = disposition.getRole();
- LinkedHashMap<UnsignedInteger, Delivery> unsettledTransfers;
+ DeliveryRegistry unsettledDeliveries;
if(dispositionRole == Role.RECEIVER)
{
- unsettledTransfers = _outgoingUnsettled;
+ unsettledDeliveries = _outgoingDeliveryRegistry;
}
else
{
- unsettledTransfers = _incomingUnsettled;
-
+ unsettledDeliveries = _incomingDeliveryRegistry;
}
SequenceNumber deliveryId = new SequenceNumber(disposition.getFirst().intValue());
SequenceNumber last;
if(disposition.getLast() == null)
{
- last = deliveryId;
+ last = new SequenceNumber(deliveryId.intValue());
}
else
{
last = new SequenceNumber(disposition.getLast().intValue());
}
-
while(deliveryId.compareTo(last)<=0)
{
UnsignedInteger deliveryIdUnsigned = UnsignedInteger.valueOf(deliveryId.intValue());
- Delivery delivery = unsettledTransfers.get(deliveryIdUnsigned);
- if(delivery != null)
+ UnsettledDelivery unsettledDelivery = unsettledDeliveries.getDelivery(deliveryIdUnsigned);
+
+ if(unsettledDelivery != null)
{
- delivery.getLinkEndpoint().receiveDeliveryState(delivery,
- disposition.getState(),
- disposition.getSettled());
+ LinkEndpoint<?,?> linkEndpoint = unsettledDelivery.getLinkEndpoint();
+ linkEndpoint.receiveDeliveryState(unsettledDelivery.getDeliveryTag(), disposition.getState(), disposition.getSettled());
if (Boolean.TRUE.equals(disposition.getSettled()))
{
- unsettledTransfers.remove(deliveryIdUnsigned);
+ unsettledDeliveries.removeDelivery(deliveryIdUnsigned);
}
}
deliveryId.incr();
@@ -607,62 +603,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
else
{
AbstractReceivingLinkEndpoint endpoint = ((AbstractReceivingLinkEndpoint) linkEndpoint);
-
- UnsignedInteger deliveryId = transfer.getDeliveryId();
- if (deliveryId == null)
- {
- deliveryId = endpoint.getLastDeliveryId();
- }
-
- Delivery delivery = _incomingUnsettled.get(deliveryId);
- if (delivery == null)
- {
- delivery = new Delivery(transfer, endpoint);
- _incomingUnsettled.put(deliveryId, delivery);
-
- if (Boolean.TRUE.equals(transfer.getMore()))
- {
- endpoint.setLastDeliveryId(transfer.getDeliveryId());
- }
- }
- else
- {
- if (delivery.getDeliveryId().equals(deliveryId))
- {
- delivery.addTransfer(transfer);
-
- if (!Boolean.TRUE.equals(transfer.getMore()))
- {
- endpoint.setLastDeliveryId(null);
- }
- }
- else
- {
- End reply = new End();
-
- Error error = new Error();
- error.setCondition(AmqpError.ILLEGAL_STATE);
- error.setDescription("TRANSFER called on Session for link handle "
- + inputHandle
- + " with incorrect delivery id "
- + transfer.getDeliveryId());
- reply.setError(error);
- _connection.sendEnd(_sendingChannel, reply, true);
-
- return;
-
- }
- }
-
- Error error = endpoint.receiveTransfer(transfer, delivery);
- if(error != null)
- {
- endpoint.close(error);
- }
- if ((delivery.isComplete() && delivery.isSettled() || Boolean.TRUE.equals(transfer.getAborted())))
- {
- _incomingUnsettled.remove(deliveryId);
- }
+ endpoint.receiveTransfer(transfer);
}
}
@@ -1536,6 +1477,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
_endpointToOutputHandle.remove(linkEndpoint);
_associatedLinkEndpoints.remove(linkEndpoint);
+ if (linkEndpoint.getRole() == Role.RECEIVER)
+ {
+ getIncomingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+ }
+ else
+ {
+ getOutgoingDeliveryRegistry().removeDeliveriesForLinkEndpoint(linkEndpoint);
+ }
}
private void detach(UnsignedInteger handle, Detach detach)
@@ -1614,6 +1563,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
return primaryDomain;
}
+ DeliveryRegistry getOutgoingDeliveryRegistry()
+ {
+ return _outgoingDeliveryRegistry;
+ }
+
+ DeliveryRegistry getIncomingDeliveryRegistry()
+ {
+ return _incomingDeliveryRegistry;
+ }
+
private class EndpointCreationCallback<T extends LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> implements FutureCallback<T>
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 96dbdc1..048fa20 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -22,7 +22,6 @@ package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -53,20 +52,14 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Target>
{
private static final Logger LOGGER = LoggerFactory.getLogger(StandardReceivingLinkEndpoint.class);
- private ArrayList<Transfer> _incompleteMessage;
- private boolean _resumedMessage;
- private Binary _messageDeliveryTag;
- private Map<Binary, Outcome> _unsettledMap = Collections.synchronizedMap(new HashMap<Binary, Outcome>());
private ReceivingDestination _receivingDestination;
public StandardReceivingLinkEndpoint(final Session_1_0 session,
@@ -89,107 +82,30 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
@Override
- protected Error messageTransfer(Transfer xfr)
+ protected Error receiveDelivery(Delivery delivery)
{
- List<QpidByteBuffer> fragments = null;
-
- org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState();
- final Binary deliveryTag = xfr.getDeliveryTag();
- UnsignedInteger messageFormat = null;
- ReceiverSettleMode transferReceiverSettleMode = null;
- Error error = null;
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<>();
- _incompleteMessage.add(xfr);
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- fragments = new ArrayList<>(_incompleteMessage.size());
-
- for (Transfer t : _incompleteMessage)
- {
- if (t.getMessageFormat() != null && messageFormat == null)
- {
- messageFormat = t.getMessageFormat();
- }
-
- if (t.getRcvSettleMode() != null)
- {
- if (transferReceiverSettleMode == null)
- {
- transferReceiverSettleMode = t.getRcvSettleMode();
- }
- else if (!transferReceiverSettleMode.equals(t.getRcvSettleMode()))
- {
- error = new Error(AmqpError.INVALID_FIELD,
- "Transfer \"rcv-settle-mode\" is set to different value than on previous transfer.");
- break;
- }
- }
- fragments.addAll(t.getPayload());
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- _resumedMessage = Boolean.TRUE.equals(xfr.getResume());
- _messageDeliveryTag = deliveryTag;
- fragments = xfr.getPayload();
- messageFormat = xfr.getMessageFormat();
- transferReceiverSettleMode = xfr.getRcvSettleMode();
- xfr.dispose();
- }
-
- if (error == null && !ReceiverSettleMode.SECOND.equals(getReceivingSettlementMode())
- && ReceiverSettleMode.SECOND.equals(transferReceiverSettleMode))
- {
- error = new Error(AmqpError.INVALID_FIELD,
- "Transfer \"rcv-settle-mode\" cannot be \"first\" when link \"rcv-settle-mode\" is set to \"second\".");
-
- }
-
- if (error != null)
- {
- for (QpidByteBuffer fragment : fragments)
- {
- fragment.dispose();
- }
- return error;
- }
+ ReceiverSettleMode transferReceiverSettleMode = delivery.getReceiverSettleMode();
- if(_resumedMessage)
+ if(delivery.getResume())
{
- if(_unsettledMap.containsKey(_messageDeliveryTag))
+ DeliveryState deliveryState = _unsettled.get(delivery.getDeliveryTag());
+ if (deliveryState instanceof Outcome)
{
- Outcome outcome = _unsettledMap.get(_messageDeliveryTag);
boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
- updateDisposition(_messageDeliveryTag, (DeliveryState) outcome, settled);
- if(settled)
- {
- _unsettledMap.remove(_messageDeliveryTag);
- }
+ updateDisposition(delivery.getDeliveryTag(), deliveryState, settled);
+ return null;
}
else
{
- throw new ConnectionScopedRuntimeException("Unexpected delivery Tag: " + _messageDeliveryTag + "_unsettledMap: " + _unsettledMap);
+ // TODO: create message ?
}
}
else
{
ServerMessage<?> serverMessage;
-
+ UnsignedInteger messageFormat = delivery.getMessageFormat();
+ org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = delivery.getState();
+ List<QpidByteBuffer> fragments = delivery.getPayload();
MessageFormat format = MessageFormatRegistry.getFormat(messageFormat == null ? 0 : messageFormat.intValue());
if(format != null)
{
@@ -279,7 +195,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
if (transactionId == null)
{
- resultantState = (DeliveryState) outcome;
+ resultantState = outcome;
}
else
{
@@ -302,12 +218,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
boolean settled = shouldReceiverSettleFirst(transferReceiverSettleMode);
- if (!settled)
- {
- _unsettledMap.put(deliveryTag, outcome);
- }
-
- updateDisposition(deliveryTag, resultantState, settled);
+ updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
getSession().getAMQPConnection()
.registerMessageReceived(serverMessage.getSize(), arrivalTime);
@@ -319,12 +230,12 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
public void postCommit()
{
- updateDisposition(deliveryTag, null, true);
+ updateDisposition(delivery.getDeliveryTag(), null, true);
}
public void onRollback()
{
- updateDisposition(deliveryTag, null, true);
+ updateDisposition(delivery.getDeliveryTag(), null, true);
}
});
}
@@ -375,16 +286,13 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
}
-
@Override
- protected void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
{
- if(Boolean.TRUE.equals(settled))
- {
- _unsettledMap.remove(deliveryTag);
- }
+ return new HashMap<>(_unsettled);
}
+
@Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
@@ -433,26 +341,20 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
setCapabilities(targetCapabilities);
setDestination(destination);
- Map initialUnsettledMap = getInitialUnsettledMap();
- Map<Binary, Outcome> unsettledCopy = new HashMap<Binary, Outcome>(_unsettledMap);
- for(Map.Entry<Binary, Outcome> entry : unsettledCopy.entrySet())
+ Map remoteUnsettled = attach.getUnsettled();
+ Map<Binary, DeliveryState> unsettledCopy = new HashMap<>(_unsettled);
+ for(Map.Entry<Binary, DeliveryState> entry : unsettledCopy.entrySet())
{
Binary deliveryTag = entry.getKey();
- if(!initialUnsettledMap.containsKey(deliveryTag))
+ if(remoteUnsettled == null || !remoteUnsettled.containsKey(deliveryTag))
{
- _unsettledMap.remove(deliveryTag);
+ _unsettled.remove(deliveryTag); // todo: removal is based on assumption that remote unsettled map is complete
}
}
getLink().setTermini(source, target);
}
- @Override
- public void initialiseUnsettled()
- {
- _localUnsettled = new HashMap(_unsettledMap);
- }
-
public ReceivingDestination getReceivingDestination()
{
return _receivingDestination;
@@ -515,7 +417,6 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
attachReceived(attach);
- initialiseUnsettled();
}
@Override
@@ -528,21 +429,4 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
attachReceived(attach);
}
-
- @Override
- protected void detach(Error error, boolean close)
- {
- super.detach(error, close);
-
- if (_incompleteMessage != null)
- {
- for (Transfer t : _incompleteMessage)
- {
- t.dispose();
- }
- _incompleteMessage = null;
- }
- _messageDeliveryTag = null;
- _resumedMessage = false;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 25dd8bd..108943d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -19,8 +19,8 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -44,7 +44,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
@@ -52,7 +51,6 @@ import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint<Coordinator>
{
private final LinkedHashMap<Integer, ServerTransaction> _createdTransactions = new LinkedHashMap<>();
- private ArrayList<Transfer> _incompleteMessage;
public TxnCoordinatorReceivingLinkEndpoint(final Session_1_0 session, final Link_1_0<Source, Coordinator> link)
{
@@ -67,43 +65,11 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
- protected Error messageTransfer(Transfer xfr)
+ protected Error receiveDelivery(Delivery delivery)
{
- List<QpidByteBuffer> payload = new ArrayList<>();
+ List<QpidByteBuffer> payload = delivery.getPayload();
- final Binary deliveryTag = xfr.getDeliveryTag();
- if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null)
- {
- _incompleteMessage = new ArrayList<Transfer>();
- _incompleteMessage.add(xfr);
- return null;
- }
- else if(_incompleteMessage != null)
- {
- _incompleteMessage.add(xfr);
- if(Boolean.TRUE.equals(xfr.getMore()))
- {
- return null;
- }
-
- for(Transfer t : _incompleteMessage)
- {
- final List<QpidByteBuffer> bufs = t.getPayload();
- if(bufs != null)
- {
- payload.addAll(bufs);
- }
- t.dispose();
- }
- _incompleteMessage=null;
-
- }
- else
- {
- payload.addAll(xfr.getPayload());
- xfr.dispose();
- }
// Only interested in the amqp-value section that holds the message to the coordinator
try
@@ -132,7 +98,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
session.incrementStartedTransactions();
state.setTxnId(session.integerToBinary(txn.getId()));
- updateDisposition(deliveryTag, state, true);
+ updateDisposition(delivery.getDeliveryTag(), state, true);
}
else if(command instanceof Discharge)
@@ -159,7 +125,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
if (error == null)
{
- updateDisposition(deliveryTag, outcome, true);
+ updateDisposition(delivery.getDeliveryTag(), outcome, true);
}
return error;
}
@@ -250,6 +216,12 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
+ protected Map<Binary, DeliveryState> getLocalUnsettled()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
protected void reattachLink(final Attach attach) throws AmqpErrorException
{
throw new AmqpErrorException(new Error(AmqpError.NOT_IMPLEMENTED, "Cannot reattach a Coordinator Link."));
@@ -283,20 +255,10 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
}
@Override
- protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
- {
-
- }
-
- @Override
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
}
- @Override
- public void initialiseUnsettled()
- {
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
index 7a47703..55588a1 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ValueHandler.java
@@ -131,7 +131,7 @@ public class ValueHandler implements DescribedTypeConstructorRegistry.Source
{
position--;
}
- originalPositions[i] = position;
+ originalPositions[i - firstBufferWithAvailable] = position;
}
Object descriptor = parse(in);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
new file mode 100644
index 0000000..8a054fd
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistry.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public interface DeliveryRegistry
+{
+ void addDelivery(UnsignedInteger deliveryId, UnsettledDelivery unsettledDelivery);
+ void removeDelivery(UnsignedInteger deliveryId);
+ UnsettledDelivery getDelivery(UnsignedInteger deliveryId);
+ void removeDeliveriesForLinkEndpoint(LinkEndpoint<?, ?> linkEndpoint);
+ UnsignedInteger getDeliveryIdByTag(Binary deliveryTag);
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/2e8efc0a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
new file mode 100644
index 0000000..466164f
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/delivery/DeliveryRegistryImpl.java
@@ -0,0 +1,79 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0.delivery;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.protocol.v1_0.LinkEndpoint;
+import org.apache.qpid.server.protocol.v1_0.type.Binary;
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class DeliveryRegistryImpl implements DeliveryRegistry
+{
+ private final Map<UnsignedInteger, UnsettledDelivery> _deliveries = new ConcurrentHashMap<>();
+ private final Map<Binary, UnsignedInteger> _deliveryIds = new ConcurrentHashMap<>();
+
+ @Override
+ public void addDelivery(final UnsignedInteger deliveryId, final UnsettledDelivery unsettledDelivery)
+ {
+ _deliveries.put(deliveryId, unsettledDelivery);
+ _deliveryIds.put(unsettledDelivery.getDeliveryTag(), deliveryId);
+ }
+
+ @Override
+ public void removeDelivery(final UnsignedInteger deliveryId)
+ {
+ UnsettledDelivery unsettledDelivery = _deliveries.remove(deliveryId);
+ if (unsettledDelivery != null)
+ {
+ _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ }
+ }
+
+ @Override
+ public UnsettledDelivery getDelivery(final UnsignedInteger deliveryId)
+ {
+ return _deliveries.get(deliveryId);
+ }
+
+ @Override
+ public void removeDeliveriesForLinkEndpoint(final LinkEndpoint<?, ?> linkEndpoint)
+ {
+ Iterator<UnsettledDelivery> iterator = _deliveries.values().iterator();
+ while (iterator.hasNext())
+ {
+ UnsettledDelivery unsettledDelivery = iterator.next();
+ if (unsettledDelivery.getLinkEndpoint() == linkEndpoint)
+ {
+ iterator.remove();
+ _deliveryIds.remove(unsettledDelivery.getDeliveryTag());
+ }
+ }
+ }
+
+ @Override
+ public UnsignedInteger getDeliveryIdByTag(final Binary deliveryTag)
+ {
+ return _deliveryIds.get(deliveryTag);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org