You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/02/16 16:11:27 UTC
svn commit: r1783244 [1/2] -
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/
Author: lquack
Date: Thu Feb 16 16:11:27 2017
New Revision: 1783244
URL: http://svn.apache.org/viewvc?rev=1783244&view=rev
Log:
QPID-7656: [Java Broker] Move functionality from Link to LinkEndpoint
Added:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
- copied, changed from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
- copied, changed from r1783242, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Removed:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLink_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Thu Feb 16 16:11:27 2017
@@ -70,41 +70,40 @@ class ConsumerTarget_1_0 extends Abstrac
{
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_1_0.class);
private final boolean _acquires;
- private final SendingLink_1_0 _link;
private long _deliveryTag = 0L;
private Binary _transactionId;
private final AMQPDescribedTypeRegistry _typeRegistry;
+ private SendingLinkEndpoint _linkEndpoint;
private final SectionEncoder _sectionEncoder;
private boolean _queueEmpty;
- public ConsumerTarget_1_0(final SendingLink_1_0 link,
- boolean acquires)
+ public ConsumerTarget_1_0(final SendingLinkEndpoint linkEndpoint, boolean acquires)
{
- super(false, link.getSession().getAMQPConnection());
- _link = link;
- _typeRegistry = link.getEndpoint().getSession().getConnection().getDescribedTypeRegistry();
+ super(false, linkEndpoint.getSession().getAMQPConnection());
+ _typeRegistry = linkEndpoint.getSession().getConnection().getDescribedTypeRegistry();
+ _linkEndpoint = linkEndpoint;
_sectionEncoder = new SectionEncoderImpl(_typeRegistry);
_acquires = acquires;
}
private SendingLinkEndpoint getEndpoint()
{
- return _link.getEndpoint();
+ return _linkEndpoint;
}
@Override
public void updateNotifyWorkDesired()
{
boolean state = false;
- Session_1_0 session = _link.getSession();
+ Session_1_0 session = _linkEndpoint.getSession();
if (session != null)
{
final AMQPConnection<?> amqpConnection = session.getAMQPConnection();
state = !amqpConnection.isTransportBlockedForWriting()
- && _link.isAttached()
+ && _linkEndpoint.isAttached()
&& getEndpoint().hasCreditToSend();
}
setNotifyWorkDesired(state);
@@ -125,7 +124,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
converter =
(MessageConverter<? super ServerMessage, Message_1_0>) MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
- message = converter.convert(serverMessage, _link.getAddressSpace());
+ message = converter.convert(serverMessage, _linkEndpoint.getAddressSpace());
}
Transfer transfer = new Transfer();
@@ -199,7 +198,7 @@ class ConsumerTarget_1_0 extends Abstrac
transfer.setDeliveryTag(tag);
- if (_link.isAttached())
+ if (_linkEndpoint.isAttached())
{
if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode()))
{
@@ -209,9 +208,9 @@ class ConsumerTarget_1_0 extends Abstrac
{
UnsettledAction action = _acquires
? new DispositionAction(tag, entry, consumer)
- : new DoNothingAction(tag, entry);
+ : new DoNothingAction();
- _link.addUnsettled(tag, action, entry);
+ _linkEndpoint.addUnsettled(tag, action, entry);
}
if (_transactionId != null)
@@ -223,7 +222,7 @@ class ConsumerTarget_1_0 extends Abstrac
// TODO - need to deal with failure here
if (_acquires && _transactionId != null)
{
- ServerTransaction txn = _link.getTransaction(_transactionId);
+ ServerTransaction txn = _linkEndpoint.getTransaction(_transactionId);
if (txn != null)
{
txn.addPostTransactionAction(new ServerTransaction.Action()
@@ -236,7 +235,7 @@ class ConsumerTarget_1_0 extends Abstrac
public void onRollback()
{
entry.release(consumer);
- _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true);
+ _linkEndpoint.updateDisposition(tag, (DeliveryState) null, true);
}
});
}
@@ -289,14 +288,13 @@ class ConsumerTarget_1_0 extends Abstrac
public boolean allocateCredit(final ServerMessage msg)
{
ProtocolEngine protocolEngine = getSession().getConnection();
- final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend();
+ final boolean hasCredit = _linkEndpoint.isAttached() && getEndpoint().hasCreditToSend();
updateNotifyWorkDesired();
if (hasCredit)
{
- SendingLinkEndpoint linkEndpoint = _link.getEndpoint();
- linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
+ _linkEndpoint.setLinkCredit(_linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE));
}
return hasCredit;
@@ -305,14 +303,13 @@ class ConsumerTarget_1_0 extends Abstrac
public void restoreCredit(final ServerMessage message)
{
- final SendingLinkEndpoint endpoint = _link.getEndpoint();
- endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE));
+ _linkEndpoint.setLinkCredit(_linkEndpoint.getLinkCredit().add(UnsignedInteger.ONE));
updateNotifyWorkDesired();
}
public void queueEmpty()
{
- if(_link.drained())
+ if(_linkEndpoint.drained())
{
updateNotifyWorkDesired();
}
@@ -324,14 +321,14 @@ class ConsumerTarget_1_0 extends Abstrac
if (isSuspended() && getEndpoint() != null)
{
- _transactionId = _link.getTransactionId();
+ _transactionId = _linkEndpoint.getTransactionId();
}
}
@Override
public Session_1_0 getSession()
{
- return _link.getSession();
+ return _linkEndpoint.getSession();
}
public void flush()
@@ -369,7 +366,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
transactionId = ((TransactionalState)state).getTxnId();
outcome = ((TransactionalState)state).getOutcome();
- txn = _link.getTransaction(transactionId);
+ txn = _linkEndpoint.getTransaction(transactionId);
if(txn == null)
{
// TODO - invalid txn id supplied
@@ -416,13 +413,13 @@ class ConsumerTarget_1_0 extends Abstrac
{
if(Boolean.TRUE.equals(settled))
{
- _link.getEndpoint().settle(_deliveryTag);
+ _linkEndpoint.settle(_deliveryTag);
}
else
{
- _link.getEndpoint().updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
+ _linkEndpoint.updateDisposition(_deliveryTag, (DeliveryState) outcome, true);
}
- _link.getEndpoint().sendFlowConditional();
+ _linkEndpoint.sendFlowConditional();
}
@Override
@@ -445,13 +442,13 @@ class ConsumerTarget_1_0 extends Abstrac
{
_queueEntry.release(getConsumer());
- _link.getEndpoint().settle(_deliveryTag);
+ _linkEndpoint.settle(_deliveryTag);
}
@Override
public void onRollback()
{
- _link.getEndpoint().settle(_deliveryTag);
+ _linkEndpoint.settle(_deliveryTag);
// TODO: apply source's default outcome if settled
}
@@ -475,7 +472,7 @@ class ConsumerTarget_1_0 extends Abstrac
{
_queueEntry.release(getConsumer());
}
- _link.getEndpoint().settle(_deliveryTag);
+ _linkEndpoint.settle(_deliveryTag);
}
@Override
@@ -496,9 +493,9 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public void postCommit()
{
- _link.getEndpoint().settle(_deliveryTag);
+ _linkEndpoint.settle(_deliveryTag);
incrementDeliveryCountOrRouteToAlternateOrDiscard();
- _link.getEndpoint().sendFlowConditional();
+ _linkEndpoint.sendFlowConditional();
}
@Override
@@ -520,8 +517,8 @@ class ConsumerTarget_1_0 extends Abstrac
{
final Modified modified = new Modified();
modified.setDeliveryFailed(true);
- _link.getEndpoint().updateDisposition(_deliveryTag, modified, true);
- _link.getEndpoint().sendFlowConditional();
+ _linkEndpoint.updateDisposition(_deliveryTag, modified, true);
+ _linkEndpoint.sendFlowConditional();
incrementDeliveryCountOrRouteToAlternateOrDiscard();
}
@@ -541,7 +538,7 @@ class ConsumerTarget_1_0 extends Abstrac
private void routeToAlternateOrDiscard()
{
- final Session_1_0 session = _link.getSession();
+ final Session_1_0 session = _linkEndpoint.getSession();
final ServerMessage message = _queueEntry.getMessage();
final EventLogger eventLogger = session.getEventLogger();
final LogSubject logSubject = session.getLogSubject();
@@ -590,25 +587,12 @@ class ConsumerTarget_1_0 extends Abstrac
private class DoNothingAction implements UnsettledAction
{
- public DoNothingAction(final Binary tag,
- final MessageInstance queueEntry)
+ public DoNothingAction()
{
}
public boolean process(final DeliveryState state, final Boolean settled)
{
- Binary transactionId = null;
- Outcome outcome = null;
- // If disposition is settled this overrides the txn?
- if(state instanceof TransactionalState)
- {
- transactionId = ((TransactionalState)state).getTxnId();
- outcome = ((TransactionalState)state).getOutcome();
- }
- else if (state instanceof Outcome)
- {
- outcome = (Outcome) state;
- }
return true;
}
}
@@ -621,9 +605,9 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public String getTargetAddress()
{
- BaseTarget target = _link.getEndpoint().getTarget();
+ BaseTarget target = _linkEndpoint.getTarget();
- return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _link.getEndpoint().getName();
+ return target instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.Target ? ((org.apache.qpid.server.protocol.v1_0.type.messaging.Target) target).getAddress() : _linkEndpoint.getName();
}
@Override
@@ -643,6 +627,6 @@ class ConsumerTarget_1_0 extends Abstrac
@Override
public String toString()
{
- return "ConsumerTarget_1_0[linkSession=" + _link.getSession().toLogString() + "]";
+ return "ConsumerTarget_1_0[linkSession=" + _linkEndpoint.getSession().toLogString() + "]";
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -24,10 +24,10 @@ package org.apache.qpid.server.protocol.
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.protocol.v1_0.type.BaseSource;
import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
@@ -57,7 +57,7 @@ public abstract class LinkEndpoint<T ext
private volatile boolean _stoppedUpdated;
private Symbol[] _capabilities;
- private enum State
+ protected enum State
{
DETACHED,
ATTACH_SENT,
@@ -72,7 +72,7 @@ public abstract class LinkEndpoint<T ext
private Session_1_0 _session;
- private volatile State _state = State.DETACHED;
+ protected volatile State _state = State.DETACHED;
private BaseSource _source;
private BaseTarget _target;
@@ -84,8 +84,6 @@ public abstract class LinkEndpoint<T ext
private UnsignedLong _maxMessageSize;
private Map<Symbol, Object> _properties;
- private Map<Binary,Delivery> _unsettledTransfers = new HashMap<Binary,Delivery>();
-
LinkEndpoint(final Session_1_0 sessionEndpoint,final Attach attach)
{
_session = sessionEndpoint;
@@ -96,6 +94,8 @@ public abstract class LinkEndpoint<T ext
_state = State.ATTACH_RECVD;
}
+ public abstract void start();
+
public boolean isStopped()
{
return _stopped;
@@ -125,6 +125,11 @@ public abstract class LinkEndpoint<T ext
return _source;
}
+ public NamedAddressSpace getAddressSpace()
+ {
+ return getSession().getConnection().getAddressSpace();
+ }
+
public void setSource(final BaseSource source)
{
_source = source;
@@ -189,29 +194,26 @@ public abstract class LinkEndpoint<T ext
break;
case ATTACHED:
_state = State.DETACH_RECVD;
- _link.remoteDetached(LinkEndpoint.this, detach);
+ remoteDetachedPerformDetach(detach);
break;
}
}
+ protected abstract void remoteDetachedPerformDetach(final Detach detach);
+
public void receiveFlow(final Flow flow)
{
}
public void addUnsettled(final Delivery unsettled)
{
- _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled);
}
public void receiveDeliveryState(final Delivery unsettled,
final DeliveryState state,
final Boolean settled)
{
- // TODO
- if (_link != null)
- {
- _link.handle(unsettled.getDeliveryTag(), state, settled);
- }
+ handle(unsettled.getDeliveryTag(), state, settled);
if (Boolean.TRUE.equals(settled))
{
@@ -219,9 +221,11 @@ public abstract class LinkEndpoint<T ext
}
}
+ protected abstract void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
+
public void settle(final Binary deliveryTag)
{
- _unsettledTransfers.remove(deliveryTag);
+
}
void setLocalHandle(final UnsignedInteger localHandle)
@@ -289,6 +293,11 @@ public abstract class LinkEndpoint<T ext
return _session;
}
+ public void setSession(final Session_1_0 session)
+ {
+ _session = session;
+ }
+
UnsignedInteger getLocalHandle()
{
return _localHandle;
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Link_1_0.java Thu Feb 16 16:11:27 2017
@@ -28,9 +28,4 @@ import org.apache.qpid.server.protocol.v
public interface Link_1_0 extends LinkModel
{
- void remoteDetached(final LinkEndpoint endpoint, Detach detach);
-
- void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled);
-
- void start();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -37,11 +37,12 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
+public abstract class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLink_1_0>
{
private UnsignedInteger _lastDeliveryId;
+ private ReceivingDestination _receivingDestination;
private static class TransientState
{
@@ -144,7 +145,7 @@ public class ReceivingLinkEndpoint exten
{
_unsettledMap.remove(deliveryTag);
}
- return getLink().messageTransfer(transfer);
+ return messageTransfer(transfer);
}
else
{
@@ -153,6 +154,18 @@ public class ReceivingLinkEndpoint exten
}
}
+ protected abstract Error messageTransfer(final Transfer transfer);
+
+ public ReceivingDestination getReceivingDestination()
+ {
+ return _receivingDestination;
+ }
+
+ public void setDestination(final ReceivingDestination receivingDestination)
+ {
+ _receivingDestination = receivingDestination;
+ }
+
@Override public void receiveFlow(final Flow flow)
{
super.receiveFlow(flow);
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -25,5 +25,5 @@ import org.apache.qpid.server.protocol.v
public interface ReceivingLink_1_0 extends Link_1_0
{
- Error messageTransfer(Transfer xfr);
+ void setLinkAttachmentToNull();
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Thu Feb 16 16:11:27 2017
@@ -21,39 +21,231 @@
package org.apache.qpid.server.protocol.v1_0;
+import java.security.AccessControlException;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.filter.FilterManager;
+import org.apache.qpid.server.filter.JMSSelectorFilter;
+import org.apache.qpid.server.filter.SelectorParsingException;
+import org.apache.qpid.server.filter.selector.ParseException;
+import org.apache.qpid.server.filter.selector.TokenMgrError;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.MessageInstanceConsumer;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.NamedAddressSpace;
+import org.apache.qpid.server.model.NotFoundException;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
+import org.apache.qpid.server.protocol.v1_0.type.Outcome;
import org.apache.qpid.server.protocol.v1_0.type.Symbol;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
import org.apache.qpid.server.protocol.v1_0.type.transport.Attach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
import org.apache.qpid.server.protocol.v1_0.type.transport.Flow;
import org.apache.qpid.server.protocol.v1_0.type.transport.Role;
import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
+import org.apache.qpid.server.txn.AutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLinkEndpoint extends LinkEndpoint<SendingLink_1_0>
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(SendingLinkEndpoint.class);
public static final Symbol PRIORITY = Symbol.valueOf("priority");
private UnsignedInteger _lastDeliveryId;
private Binary _lastDeliveryTag;
- private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<Binary, UnsignedInteger>();
+ private Map<Binary, UnsignedInteger> _unsettledMap = new HashMap<>();
+ private Map<Binary, MessageInstance> _unsettledMap2 = new HashMap<>();
private Binary _transactionId;
private Integer _priority;
+ private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
+ private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+ private volatile boolean _draining = false;
+ private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
+ private TerminusDurability _durability;
+ private SendingDestination _destination;
+ private EnumSet<ConsumerOption> _consumerOptions;
+ private FilterManager _consumerFilters;
+ private ConsumerTarget_1_0 _consumerTarget;
+ private MessageInstanceConsumer<ConsumerTarget_1_0> _consumer;
-
- public SendingLinkEndpoint(final Session_1_0 sessionEndpoint, final Attach attach)
+ public SendingLinkEndpoint(final Session_1_0 session, final Attach attach)
{
- super(sessionEndpoint, attach);
+ super(session, attach);
setSendingSettlementMode(attach.getSndSettleMode());
setReceivingSettlementMode(attach.getRcvSettleMode());
init();
}
@Override
+ public void start()
+ {
+ }
+
+ public void doStuff(final SendingDestination destination) throws AmqpErrorException
+ {
+ _destination = destination;
+ final Source source = (Source) getSource();
+
+ EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
+
+ boolean noLocal = false;
+ JMSSelectorFilter messageFilter = null;
+
+ if(destination instanceof ExchangeDestination)
+ {
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
+ }
+ else if(destination instanceof MessageSourceDestination)
+ {
+ MessageSource messageSource = _destination.getMessageSource();
+
+ if(messageSource instanceof Queue && ((Queue<?>)messageSource).getAvailableAttributes().contains("topic"))
+ {
+ source.setDistributionMode(StdDistMode.COPY);
+ }
+
+ Map<Symbol,Filter> filters = source.getFilter();
+
+ Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
+
+ if(filters != null)
+ {
+ for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
+ {
+ if(entry.getValue() instanceof NoLocalFilter)
+ {
+ actualFilters.put(entry.getKey(), entry.getValue());
+ noLocal = true;
+ }
+ else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
+ {
+
+ org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
+ try
+ {
+ messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
+
+ actualFilters.put(entry.getKey(), entry.getValue());
+ }
+ catch (ParseException | SelectorParsingException | TokenMgrError e)
+ {
+ Error error = new Error();
+ error.setCondition(AmqpError.INVALID_FIELD);
+ error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
+ error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
+ throw new AmqpErrorException(error);
+ }
+
+
+ }
+ }
+ }
+ source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
+
+ if(source.getDistributionMode() != StdDistMode.COPY)
+ {
+ options.add(ConsumerOption.ACQUIRES);
+ options.add(ConsumerOption.SEES_REQUEUES);
+ }
+ }
+ else
+ {
+ throw new ConnectionScopedRuntimeException("Unknown destination type");
+ }
+ if(noLocal)
+ {
+ options.add(ConsumerOption.NO_LOCAL);
+ }
+
+ FilterManager filters = null;
+ if(messageFilter != null)
+ {
+ filters = new FilterManager();
+ filters.add(messageFilter.getName(), messageFilter);
+ }
+ _consumerOptions = options;
+ _consumerFilters = filters;
+ }
+
+ void createConsumerTarget() throws AmqpErrorException
+ {
+ final Source source = (Source) getSource();
+ _consumerTarget = new ConsumerTarget_1_0(this,
+ _destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
+ try
+ {
+ final String name;
+ if(getTarget() instanceof Target)
+ {
+ Target target = (Target) getTarget();
+ name = target.getAddress() == null ? getName() : target.getAddress();
+ }
+ else
+ {
+ name = getName();
+ }
+
+ _consumer = _destination.getMessageSource()
+ .addConsumer(_consumerTarget,
+ _consumerFilters,
+ Message_1_0.class,
+ name,
+ _consumerOptions,
+ getPriority());
+ _consumerTarget.updateNotifyWorkDesired();
+ }
+ catch (MessageSource.ExistingExclusiveConsumer e)
+ {
+ String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
+ throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+ }
+ catch (MessageSource.ExistingConsumerPreventsExclusive e)
+ {
+ String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
+ throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+ }
+ catch (MessageSource.ConsumerAccessRefused e)
+ {
+ String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
+ throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
+ }
+ catch (MessageSource.QueueDeleted e)
+ {
+ String msg = "Cannot add a consumer to the destination as the destination has been deleted";
+ throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), e);
+ }
+ }
+
+
+ @Override
protected Map<Symbol, Object> initProperties(final Attach attach)
{
@@ -98,7 +290,8 @@ public class SendingLinkEndpoint extends
setAvailable(UnsignedInteger.valueOf(0));
}
- @Override public Role getRole()
+ @Override
+ public Role getRole()
{
return Role.SENDER;
}
@@ -108,6 +301,11 @@ public class SendingLinkEndpoint extends
return _priority;
}
+ public void setDurability(final TerminusDurability durability)
+ {
+ _durability = durability;
+ }
+
public boolean transfer(final Transfer xfr, final boolean decrementCredit)
{
Session_1_0 s = getSession();
@@ -141,11 +339,20 @@ public class SendingLinkEndpoint extends
}
- public void drained()
+ public boolean drained()
{
- setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
- setLinkCredit(UnsignedInteger.ZERO);
- sendFlow();
+ if (_draining)
+ {
+ setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ setLinkCredit(UnsignedInteger.ZERO);
+ sendFlow();
+ _draining = false;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
@Override
@@ -185,27 +392,142 @@ public class SendingLinkEndpoint extends
@Override
public void flowStateChanged()
{
- getLink().flowStateChanged();
+ if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
+ {
+ _draining = true;
+ getConsumerTarget().flush();
+ }
+
+ while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
+ {
+ Accepted accepted = new Accepted();
+ Transfer xfr = new Transfer();
+ Binary dt = _resumeAcceptedTransfers.remove(0);
+ xfr.setDeliveryTag(dt);
+ xfr.setState(accepted);
+ xfr.setResume(Boolean.TRUE);
+ transfer(xfr, true);
+ xfr.dispose();
+ }
+ if(_resumeAcceptedTransfers.isEmpty())
+ {
+ getConsumerTarget().flowStateChanged();
+ }
}
- public boolean hasCreditToSend()
+
+ @Override
+ protected void remoteDetachedPerformDetach(final Detach detach)
{
- UnsignedInteger linkCredit = getLinkCredit();
- return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
- && getSession().hasCreditToSend();
+ getConsumerTarget().close();
+ //TODO
+ // if not durable or close
+ if (Boolean.TRUE.equals(detach.getClosed())
+ || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals(
+ _durability)))
+ {
+
+ Modified state = new Modified();
+ state.setDeliveryFailed(true);
+
+ for(UnsettledAction action : _unsettledActionMap.values())
+ {
+ action.process(state,Boolean.TRUE);
+ }
+ _unsettledActionMap.clear();
+
+ close();
+
+ if (getDestination() instanceof ExchangeDestination
+ && (_durability == TerminusDurability.CONFIGURATION
+ || _durability == TerminusDurability.UNSETTLED_STATE))
+ {
+ try
+ {
+ if (getSession().getConnection().getAddressSpace() instanceof QueueManagingVirtualHost)
+ {
+ ((QueueManagingVirtualHost) getSession().getConnection().getAddressSpace()).removeSubscriptionQueue(((ExchangeDestination) getDestination()).getQueue().getName());
+ }
+ }
+ catch (AccessControlException e)
+ {
+ LOGGER.error("Error unregistering subscription", e);
+ detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
+ }
+ catch (IllegalStateException e)
+ {
+ detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
+ }
+ catch (NotFoundException e)
+ {
+ detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
+ }
+ }
+ }
+ else if (detach.getError() != null && !getSession().isSyntheticError(detach.getError()))
+ {
+ try
+ {
+ getLink().setLinkAttachment(null, null);
+ }
+ catch (AmqpErrorException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+ getConsumerTarget().flowStateChanged();
+ detach();
+ }
+ else
+ {
+ detach();
+ getConsumerTarget().updateNotifyWorkDesired();
+ }
+ }
+
+ public void addUnsettled(final Binary tag, final UnsettledAction unsettledAction, final MessageInstance queueEntry)
+ {
+ _unsettledActionMap.put(tag, unsettledAction);
+ if(getTransactionId() == null)
+ {
+ _unsettledMap2.put(tag, queueEntry);
+ }
+
}
- public void receiveDeliveryState(final Delivery unsettled,
- final DeliveryState state,
- final Boolean settled)
+ @Override
+ protected void handle(final Binary deliveryTag, final DeliveryState state, final Boolean settled)
{
- super.receiveDeliveryState(unsettled, state, settled);
- if(settled)
+ UnsettledAction action = _unsettledActionMap.get(deliveryTag);
+ boolean localSettle = false;
+ if(action != null)
+ {
+ localSettle = action.process(state, settled);
+ if(localSettle && !Boolean.TRUE.equals(settled))
+ {
+ updateDisposition(deliveryTag, state, true);
+ }
+ }
+ if(Boolean.TRUE.equals(settled) || localSettle)
{
- _unsettledMap.remove(unsettled.getDeliveryTag());
+ _unsettledActionMap.remove(deliveryTag);
+ _unsettledMap.remove(deliveryTag);
+ _unsettledMap2.remove(deliveryTag);
}
}
+ public ServerTransaction getTransaction(Binary transactionId)
+ {
+ Session_1_0 session = getSession();
+ return session == null ? null : session.getTransaction(transactionId);
+ }
+
+ public boolean hasCreditToSend()
+ {
+ UnsignedInteger linkCredit = getLinkCredit();
+ return linkCredit != null && (linkCredit.compareTo(UnsignedInteger.valueOf(0)) > 0)
+ && getSession().hasCreditToSend();
+ }
+
public UnsignedInteger getLastDeliveryId()
{
return _lastDeliveryId;
@@ -219,9 +541,9 @@ public class SendingLinkEndpoint extends
public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled)
{
UnsignedInteger deliveryId;
- if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null)
+ if (settled && (deliveryId = _unsettledMap.remove(deliveryTag)) != null)
{
- settle(deliveryTag);
+ _unsettledMap2.remove(deliveryTag);
getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled);
}
}
@@ -231,4 +553,119 @@ public class SendingLinkEndpoint extends
return _transactionId;
}
+ public void doLinkAttachment(final Session_1_0 session, final MessageInstanceConsumer consumer) throws AmqpErrorException
+ {
+ if (session != null)
+ {
+ createConsumerTarget();
+
+ setSession(session);
+ _resumeAcceptedTransfers.clear();
+ _resumeFullTransfers.clear();
+ final NamedAddressSpace addressSpace = getSession().getConnection().getAddressSpace();
+ Map<Binary, MessageInstance> unsettledCopy = new HashMap<>(_unsettledMap2);
+ Map initialUnsettledMap = getInitialUnsettledMap();
+
+ for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
+ {
+ Binary deliveryTag = entry.getKey();
+ final MessageInstance queueEntry = entry.getValue();
+ if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
+ {
+ queueEntry.setRedelivered();
+ queueEntry.release(consumer);
+ _unsettledMap2.remove(deliveryTag);
+ }
+ else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
+ {
+ Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
+
+ if (outcome instanceof Accepted)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+ if (consumer.acquires())
+ {
+ if (queueEntry.acquire() || queueEntry.isAcquired())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.delete();
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ }
+ }
+ else if (outcome instanceof Released)
+ {
+ AutoCommitTransaction txn = new AutoCommitTransaction(addressSpace.getMessageStore());
+ if (consumer.acquires())
+ {
+ txn.dequeue(Collections.singleton(queueEntry),
+ new ServerTransaction.Action()
+ {
+ public void postCommit()
+ {
+ queueEntry.release(consumer);
+ }
+
+ public void onRollback()
+ {
+ }
+ });
+ }
+ }
+ //_unsettledMap.remove(deliveryTag);
+ initialUnsettledMap.remove(deliveryTag);
+ _resumeAcceptedTransfers.add(deliveryTag);
+ }
+ else
+ {
+ _resumeFullTransfers.add(queueEntry);
+ // exists in receivers map, but not yet got an outcome ... should resend with resume = true
+ }
+ // TODO - else
+ }
+ }
+
+ getConsumerTarget().updateNotifyWorkDesired();
+ }
+
+ public Map<Binary, MessageInstance> getUnsettledOutcomeMap()
+ {
+ Map<Binary, MessageInstance> unsettled = new HashMap<>(_unsettledMap2);
+
+ for (Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
+ {
+ entry.setValue(null);
+ }
+
+ return unsettled;
+ }
+
+ public MessageInstanceConsumer<ConsumerTarget_1_0> getConsumer()
+ {
+ return _consumer;
+ }
+
+ public ConsumerTarget_1_0 getConsumerTarget()
+ {
+ return _consumerTarget;
+ }
+
+ public SendingDestination getDestination()
+ {
+ return _destination;
+ }
+
+ public void setDestination(final SendingDestination destination)
+ {
+ _destination = destination;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Thu Feb 16 16:11:27 2017
@@ -20,541 +20,28 @@
*/
package org.apache.qpid.server.protocol.v1_0;
-import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.qpid.server.filter.SelectorParsingException;
-import org.apache.qpid.server.filter.selector.ParseException;
-import org.apache.qpid.server.filter.selector.TokenMgrError;
-import org.apache.qpid.server.consumer.ConsumerOption;
-import org.apache.qpid.server.filter.FilterManager;
-import org.apache.qpid.server.filter.JMSSelectorFilter;
-import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.message.MessageInstanceConsumer;
-import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.NamedAddressSpace;
-import org.apache.qpid.server.model.NotFoundException;
-import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.Binary;
-import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
-import org.apache.qpid.server.protocol.v1_0.type.Outcome;
-import org.apache.qpid.server.protocol.v1_0.type.Symbol;
-import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Released;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Source;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.Target;
-import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Detach;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer;
-import org.apache.qpid.server.txn.AutoCommitTransaction;
-import org.apache.qpid.server.txn.ServerTransaction;
-import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
public class SendingLink_1_0 implements Link_1_0
{
- private static final Logger _logger = LoggerFactory.getLogger(SendingLink_1_0.class);
- private final EnumSet<ConsumerOption> _consumerOptions;
- private final FilterManager _consumerFilters;
-
- private NamedAddressSpace _addressSpace;
- private volatile SendingDestination _destination;
-
- private MessageInstanceConsumer _consumer;
- private ConsumerTarget_1_0 _target;
-
- private boolean _draining;
- private final Map<Binary, MessageInstance> _unsettledMap =
- new HashMap<Binary, MessageInstance>();
-
- private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap =
- new ConcurrentHashMap<Binary, UnsettledAction>();
- private volatile SendingLinkAttachment _linkAttachment;
- private TerminusDurability _durability;
- private List<MessageInstance> _resumeFullTransfers = new ArrayList<MessageInstance>();
- private List<Binary> _resumeAcceptedTransfers = new ArrayList<Binary>();
- private Runnable _closeAction;
-
-
- public SendingLink_1_0(final SendingLinkAttachment linkAttachment,
- final NamedAddressSpace addressSpace,
- final SendingDestination destination)
- throws AmqpErrorException
- {
- _addressSpace = addressSpace;
- _destination = destination;
- _linkAttachment = linkAttachment;
- final Source source = (Source) linkAttachment.getSource();
- _durability = source.getDurable();
-
- EnumSet<ConsumerOption> options = EnumSet.noneOf(ConsumerOption.class);
-
- boolean noLocal = false;
- JMSSelectorFilter messageFilter = null;
-
- if(destination instanceof ExchangeDestination)
- {
- options.add(ConsumerOption.ACQUIRES);
- options.add(ConsumerOption.SEES_REQUEUES);
- }
- else if(destination instanceof MessageSourceDestination)
- {
- MessageSource messageSource = _destination.getMessageSource();
-
- if(messageSource instanceof Queue && ((Queue<?>)messageSource).getAvailableAttributes().contains("topic"))
- {
- source.setDistributionMode(StdDistMode.COPY);
- }
-
- Map<Symbol,Filter> filters = source.getFilter();
-
- Map<Symbol,Filter> actualFilters = new HashMap<Symbol,Filter>();
-
- if(filters != null)
- {
- for(Map.Entry<Symbol,Filter> entry : filters.entrySet())
- {
- if(entry.getValue() instanceof NoLocalFilter)
- {
- actualFilters.put(entry.getKey(), entry.getValue());
- noLocal = true;
- }
- else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter)
- {
-
- org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue();
- try
- {
- messageFilter = new JMSSelectorFilter(selectorFilter.getValue());
-
- actualFilters.put(entry.getKey(), entry.getValue());
- }
- catch (ParseException | SelectorParsingException | TokenMgrError e)
- {
- Error error = new Error();
- error.setCondition(AmqpError.INVALID_FIELD);
- error.setDescription("Invalid JMS Selector: " + selectorFilter.getValue());
- error.setInfo(Collections.singletonMap(Symbol.valueOf("field"), Symbol.valueOf("filter")));
- throw new AmqpErrorException(error);
- }
-
-
- }
- }
- }
- source.setFilter(actualFilters.isEmpty() ? null : actualFilters);
-
- if(source.getDistributionMode() != StdDistMode.COPY)
- {
- options.add(ConsumerOption.ACQUIRES);
- options.add(ConsumerOption.SEES_REQUEUES);
- }
- }
- else
- {
- throw new ConnectionScopedRuntimeException("Unknown destination type");
- }
- if(noLocal)
- {
- options.add(ConsumerOption.NO_LOCAL);
- }
-
- FilterManager filters = null;
- if(messageFilter != null)
- {
- filters = new FilterManager();
- filters.add(messageFilter.getName(), messageFilter);
- }
- _consumerOptions = options;
- _consumerFilters = filters;
- }
-
- void createConsumerTarget() throws AmqpErrorException
- {
- final Source source = (Source) getEndpoint().getSource();
- _target = new ConsumerTarget_1_0(this, _destination instanceof ExchangeDestination ? true : source.getDistributionMode() != StdDistMode.COPY);
- try
- {
- final String name;
- if(getEndpoint().getTarget() instanceof Target)
- {
- Target target = (Target) getEndpoint().getTarget();
- name = target.getAddress() == null ? getEndpoint().getName() : target.getAddress();
- }
- else
- {
- name = getEndpoint().getName();
- }
-
- _consumer = _destination.getMessageSource()
- .addConsumer(_target,
- _consumerFilters,
- Message_1_0.class,
- name,
- _consumerOptions,
- getEndpoint().getPriority());
- _target.updateNotifyWorkDesired();
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- String msg = "Cannot add a consumer to the destination as there is already an exclusive consumer";
- throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- String msg = "Cannot add an exclusive consumer to the destination as there is already a consumer";
- throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
- }
- catch (MessageSource.ConsumerAccessRefused e)
- {
- String msg = "Cannot add an exclusive consumer to the destination as there is an incompatible exclusivity policy";
- throw new AmqpErrorException(new Error(AmqpError.RESOURCE_LOCKED, msg), e);
- }
- catch (MessageSource.QueueDeleted e)
- {
- String msg = "Cannot add a consumer to the destination as the destination has been deleted";
- throw new AmqpErrorException(new Error(AmqpError.RESOURCE_DELETED, msg), e);
- }
- }
-
- public void remoteDetached(final LinkEndpoint endpoint, final Detach detach)
- {
- _target.close();
- //TODO
- // if not durable or close
- if (Boolean.TRUE.equals(detach.getClosed())
- || !(TerminusDurability.UNSETTLED_STATE.equals(_durability) || TerminusDurability.CONFIGURATION.equals( _durability)))
- {
-
- Modified state = new Modified();
- state.setDeliveryFailed(true);
- for(UnsettledAction action : _unsettledActionMap.values())
- {
- action.process(state,Boolean.TRUE);
- }
- _unsettledActionMap.clear();
+ private volatile SendingLinkEndpoint _linkEndpoint;
- endpoint.close();
- if(_destination instanceof ExchangeDestination
- && (_durability == TerminusDurability.CONFIGURATION
- || _durability == TerminusDurability.UNSETTLED_STATE))
- {
- try
- {
- if (getAddressSpace() instanceof QueueManagingVirtualHost)
- {
- ((QueueManagingVirtualHost) getAddressSpace()).removeSubscriptionQueue(((ExchangeDestination) _destination).getQueue().getName());
- }
- }
- catch (AccessControlException e)
- {
- _logger.error("Error unregistering subscription", e);
- endpoint.detach(new Error(AmqpError.NOT_ALLOWED, "Error unregistering subscription"));
- }
- catch (IllegalStateException e)
- {
- endpoint.detach(new Error(AmqpError.RESOURCE_LOCKED, e.getMessage()));
- }
- catch (NotFoundException e)
- {
- endpoint.detach(new Error(AmqpError.NOT_FOUND, e.getMessage()));
- }
- }
-
- if(_closeAction != null)
- {
- _closeAction.run();
- }
-
- }
- else if (detach.getError() != null && !_linkAttachment.getEndpoint()
- .getSession()
- .isSyntheticError(detach.getError()))
- {
- _linkAttachment = null;
- _target.flowStateChanged();
- }
- else
- {
- endpoint.detach();
- _target.updateNotifyWorkDesired();
- }
- }
-
- public void start()
+ public SendingLink_1_0(final SendingLinkEndpoint linkEndpoint)
{
- //TODO
+ _linkEndpoint = linkEndpoint;
}
public SendingLinkEndpoint getEndpoint()
{
- return _linkAttachment == null ? null : _linkAttachment.getEndpoint() ;
- }
-
- public Session_1_0 getSession()
- {
- return _linkAttachment == null ? null : _linkAttachment.getSession();
- }
-
- public void flowStateChanged()
- {
- if(Boolean.TRUE.equals(getEndpoint().getDrain())
- && hasCredit())
- {
- _draining = true;
- _target.flush();
- }
-
- while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend())
- {
- Accepted accepted = new Accepted();
- Transfer xfr = new Transfer();
- Binary dt = _resumeAcceptedTransfers.remove(0);
- xfr.setDeliveryTag(dt);
- xfr.setState(accepted);
- xfr.setResume(Boolean.TRUE);
- getEndpoint().transfer(xfr, true);
- xfr.dispose();
- }
- if(_resumeAcceptedTransfers.isEmpty())
- {
- _target.flowStateChanged();
- }
-
- }
-
- boolean hasCredit()
- {
- return getEndpoint().getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0;
- }
-
- public boolean isDraining()
- {
- return false; //TODO
- }
-
- public boolean drained()
- {
- if(getEndpoint() != null)
- {
- if (_draining)
- {
- //TODO
- getEndpoint().drained();
- _draining = false;
- return true;
- }
- else
- {
- return false;
- }
-
- }
- else
- {
- return false;
- }
- }
-
- public void addUnsettled(Binary tag, UnsettledAction unsettledAction, MessageInstance queueEntry)
- {
- _unsettledActionMap.put(tag,unsettledAction);
- if(getTransactionId() == null)
- {
- _unsettledMap.put(tag, queueEntry);
- }
- }
-
- public void removeUnsettled(Binary tag)
- {
- _unsettledActionMap.remove(tag);
- }
-
- public void handle(Binary deliveryTag, DeliveryState state, Boolean settled)
- {
- UnsettledAction action = _unsettledActionMap.get(deliveryTag);
- boolean localSettle = false;
- if(action != null)
- {
- localSettle = action.process(state, settled);
- if(localSettle && !Boolean.TRUE.equals(settled))
- {
- _linkAttachment.updateDisposition(deliveryTag, state, true);
- }
- }
- if(Boolean.TRUE.equals(settled) || localSettle)
- {
- _unsettledActionMap.remove(deliveryTag);
- _unsettledMap.remove(deliveryTag);
- }
- }
-
- ServerTransaction getTransaction(Binary transactionId)
- {
- Session_1_0 session = _linkAttachment.getSession();
- return session == null ? null : session.getTransaction(transactionId);
- }
-
- public Binary getTransactionId()
- {
- SendingLinkEndpoint endpoint = getEndpoint();
- return endpoint == null ? null : endpoint.getTransactionId();
- }
-
- public boolean isDetached()
- {
- return _linkAttachment == null || getEndpoint().isDetached();
- }
-
- public boolean isAttached()
- {
- return _linkAttachment != null && getEndpoint().isAttached();
- }
-
- public synchronized void setLinkAttachment(SendingLinkAttachment linkAttachment) throws AmqpErrorException
- {
- _linkAttachment = linkAttachment;
-
- if (linkAttachment.getSession() != null)
- {
- SendingLinkEndpoint endpoint = linkAttachment.getEndpoint();
- Map initialUnsettledMap = endpoint.getInitialUnsettledMap();
- Map<Binary, MessageInstance> unsettledCopy = new HashMap<Binary, MessageInstance>(_unsettledMap);
- _resumeAcceptedTransfers.clear();
- _resumeFullTransfers.clear();
-
- createConsumerTarget();
-
- for (Map.Entry<Binary, MessageInstance> entry : unsettledCopy.entrySet())
- {
- Binary deliveryTag = entry.getKey();
- final MessageInstance queueEntry = entry.getValue();
- if (initialUnsettledMap == null || !initialUnsettledMap.containsKey(deliveryTag))
- {
- queueEntry.setRedelivered();
- queueEntry.release(_consumer);
- _unsettledMap.remove(deliveryTag);
- }
- else if (initialUnsettledMap.get(deliveryTag) instanceof Outcome)
- {
- Outcome outcome = (Outcome) initialUnsettledMap.get(deliveryTag);
-
- if (outcome instanceof Accepted)
- {
- AutoCommitTransaction txn = new AutoCommitTransaction(_addressSpace.getMessageStore());
- if (_consumer.acquires())
- {
- if (queueEntry.acquire() || queueEntry.isAcquired())
- {
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.delete();
- }
-
- public void onRollback()
- {
- }
- });
- }
- }
- }
- else if (outcome instanceof Released)
- {
- AutoCommitTransaction txn = new AutoCommitTransaction(_addressSpace.getMessageStore());
- if (_consumer.acquires())
- {
- txn.dequeue(Collections.singleton(queueEntry),
- new ServerTransaction.Action()
- {
- public void postCommit()
- {
- queueEntry.release(_consumer);
- }
-
- public void onRollback()
- {
- }
- });
- }
- }
- //_unsettledMap.remove(deliveryTag);
- initialUnsettledMap.remove(deliveryTag);
- _resumeAcceptedTransfers.add(deliveryTag);
- }
- else
- {
- _resumeFullTransfers.add(queueEntry);
- // exists in receivers map, but not yet got an outcome ... should resend with resume = true
- }
- // TODO - else
- }
- }
-
- _target.updateNotifyWorkDesired();
- }
-
- public Map getUnsettledOutcomeMap()
- {
- Map<Binary, MessageInstance> unsettled = new HashMap<Binary, MessageInstance>(_unsettledMap);
-
- for(Map.Entry<Binary, MessageInstance> entry : unsettled.entrySet())
- {
- entry.setValue(null);
- }
-
- return unsettled;
- }
-
- public void setCloseAction(Runnable action)
- {
- _closeAction = action;
- }
-
- public NamedAddressSpace getAddressSpace()
- {
- return _addressSpace;
- }
-
- public MessageInstanceConsumer getConsumer()
- {
- return _consumer;
- }
-
- public ConsumerTarget_1_0 getConsumerTarget()
- {
- return _target;
- }
-
- public SendingDestination getDestination()
- {
- return _destination;
+ return _linkEndpoint;
}
- public void setDestination(final SendingDestination destination)
+ public synchronized void setLinkAttachment(final Session_1_0 session,
+ final SendingLinkEndpoint linkEndpoint) throws AmqpErrorException
{
- _destination = destination;
+ _linkEndpoint = linkEndpoint;
+ _linkEndpoint.doLinkAttachment(session, getEndpoint().getConsumer());
}
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1783244&r1=1783243&r2=1783244&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 16 16:11:27 2017
@@ -228,9 +228,26 @@ public class Session_1_0 extends Abstrac
LinkEndpoint endpoint = linkMap.get(attach.getName());
if(endpoint == null)
{
- endpoint = attach.getRole() == Role.RECEIVER
- ? new SendingLinkEndpoint(this, attach)
- : new ReceivingLinkEndpoint(this, attach);
+
+ if (attach.getRole() == Role.RECEIVER)
+ {
+ endpoint = new SendingLinkEndpoint(this, attach);
+ }
+ else if (attach.getRole() == Role.SENDER)
+ {
+ if (attach.getTarget() instanceof Coordinator)
+ {
+ endpoint = new TxnCoordinatorReceivingLinkEndpoint(this, attach);
+ }
+ else
+ {
+ endpoint = new StandardReceivingLinkEndpoint(this, attach);
+ }
+ }
+ else
+ {
+ // TODO error handling
+ }
if(_blockingEntities.contains(this) && attach.getRole() == Role.SENDER)
{
@@ -770,7 +787,7 @@ public class Session_1_0 extends Abstrac
}
else
{
- link.start();
+ endpoint.start();
}
}
@@ -873,17 +890,15 @@ public class Session_1_0 extends Abstrac
if (destination != null)
{
final ReceivingDestination receivingDestination = (ReceivingDestination) destination;
+
MessageDestination messageDestination = receivingDestination.getMessageDestination();
if(!(messageDestination instanceof Queue) || ((Queue<?>)messageDestination).isHoldOnPublishEnabled())
{
capabilities.add(DELAYED_DELIVERY);
}
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- final StandardReceivingLink_1_0 receivingLink =
- new StandardReceivingLink_1_0(new ReceivingLinkAttachment(this, receivingLinkEndpoint),
- getAddressSpace(),
- receivingDestination);
-
+ final StandardReceivingLink_1_0 receivingLink = new StandardReceivingLink_1_0(receivingLinkEndpoint);
+ receivingLinkEndpoint.setDestination(receivingDestination);
receivingLinkEndpoint.setLink(receivingLink);
link = receivingLink;
if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
@@ -895,11 +910,11 @@ public class Session_1_0 extends Abstrac
}
else
{
- ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
- previousLink.setLinkAttachment(new ReceivingLinkAttachment(this, receivingLinkEndpoint));
+ StandardReceivingLinkEndpoint receivingLinkEndpoint = (StandardReceivingLinkEndpoint) endpoint;
+ previousLink.setLinkAttachment(receivingLinkEndpoint);
receivingLinkEndpoint.setLink(previousLink);
link = previousLink;
- endpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
+ receivingLinkEndpoint.setLocalUnsettled(receivingLinkEndpoint.getUnsettledOutcomeMap());
}
return link;
}
@@ -934,9 +949,8 @@ public class Session_1_0 extends Abstrac
final ReceivingLinkEndpoint receivingLinkEndpoint = (ReceivingLinkEndpoint) endpoint;
final TxnCoordinatorReceivingLink_1_0 coordinatorLink =
- new TxnCoordinatorReceivingLink_1_0(getAddressSpace(),
- this,
- receivingLinkEndpoint
+ new TxnCoordinatorReceivingLink_1_0(
+ receivingLinkEndpoint
);
receivingLinkEndpoint.setLink(coordinatorLink);
return coordinatorLink;
@@ -1056,13 +1070,16 @@ public class Session_1_0 extends Abstrac
if (destination != null)
{
final SendingLink_1_0 sendingLink =
- new SendingLink_1_0(new SendingLinkAttachment(this, sendingLinkEndpoint),
- getAddressSpace(),
- destination);
- sendingLink.createConsumerTarget();
+ new SendingLink_1_0(
+ sendingLinkEndpoint);
+ //sendingLink.createConsumerTarget();
+
+ sendingLinkEndpoint.doStuff(destination);
+ sendingLinkEndpoint.createConsumerTarget();
sendingLinkEndpoint.setLink(sendingLink);
- registerConsumer(sendingLink);
+ sendingLinkEndpoint.setDurability(((Source) attach.getSource()).getDurable());
+ registerConsumer(sendingLinkEndpoint);
if (destination instanceof ExchangeDestination)
{
@@ -1084,31 +1101,42 @@ public class Session_1_0 extends Abstrac
Source newSource = (Source) attach.getSource();
Source oldSource = (Source) previousLink.getEndpoint().getSource();
- if (previousLink.getDestination() instanceof ExchangeDestination && newSource != null && !Boolean.TRUE.equals(newSource.getDynamic()))
+ if (sendingLinkEndpoint.getDestination() == null)
{
- final SendingDestination newDestination = getSendingDestination(previousLink.getEndpoint().getName(), newSource);
- if (updateSourceForSubscription(previousLink, newSource, newDestination))
+ final SendingDestination oldDestination =
+ getSendingDestination(previousLink.getEndpoint().getName(), oldSource);
+ sendingLinkEndpoint.doStuff(oldDestination);
+ sendingLinkEndpoint.setDurability(oldSource.getDurable());
+
+ }
+
+ if (sendingLinkEndpoint.getDestination() instanceof ExchangeDestination
+ && newSource != null
+ && !Boolean.TRUE.equals(newSource.getDynamic()))
+ {
+ final SendingDestination newDestination =
+ getSendingDestination(previousLink.getEndpoint().getName(), newSource);
+ if (updateSourceForSubscription(sendingLinkEndpoint, newSource, newDestination))
{
- previousLink.setDestination(newDestination);
+ sendingLinkEndpoint.setDestination(newDestination);
}
}
sendingLinkEndpoint.setSource(oldSource);
- previousLink.setLinkAttachment(new SendingLinkAttachment(this, sendingLinkEndpoint));
+ previousLink.setLinkAttachment(this, sendingLinkEndpoint);
sendingLinkEndpoint.setLink(previousLink);
link = previousLink;
- sendingLinkEndpoint.setLocalUnsettled(previousLink.getUnsettledOutcomeMap());
- registerConsumer(previousLink);
+ sendingLinkEndpoint.setLocalUnsettled(sendingLinkEndpoint.getUnsettledOutcomeMap());
+ registerConsumer(sendingLinkEndpoint);
}
return link;
}
- private boolean updateSourceForSubscription(final SendingLink_1_0 previousLink,
- final Source newSource,
+ private boolean updateSourceForSubscription(final SendingLinkEndpoint linkEndpoint, final Source newSource,
final SendingDestination newDestination)
{
- SendingDestination oldDestination = previousLink.getDestination();
+ SendingDestination oldDestination = linkEndpoint.getDestination();
if (oldDestination instanceof ExchangeDestination)
{
ExchangeDestination oldExchangeDestination = (ExchangeDestination) oldDestination;
@@ -1118,7 +1146,7 @@ public class Session_1_0 extends Abstrac
ExchangeDestination newExchangeDestination = (ExchangeDestination) newDestination;
if (oldExchangeDestination.getQueue() != newExchangeDestination.getQueue())
{
- Source oldSource = (Source) previousLink.getEndpoint().getSource();
+ Source oldSource = (Source) linkEndpoint.getSource();
oldSource.setAddress(newAddress);
oldSource.setFilter(newSource.getFilter());
return true;
@@ -1354,9 +1382,9 @@ public class Session_1_0 extends Abstrac
}
- private void registerConsumer(final SendingLink_1_0 link)
+ private void registerConsumer(final SendingLinkEndpoint linkEndpoint)
{
- MessageInstanceConsumer consumer = link.getConsumer();
+ MessageInstanceConsumer consumer = linkEndpoint.getConsumer();
if(consumer instanceof Consumer<?,?>)
{
Consumer<?,ConsumerTarget_1_0> modelConsumer = (Consumer<?,ConsumerTarget_1_0>) consumer;
@@ -1563,8 +1591,7 @@ public class Session_1_0 extends Abstrac
{
for(SendingLinkEndpoint endpoint : _sendingLinkMap.values())
{
- Link_1_0 link = endpoint.getLink();
- ConsumerTarget_1_0 target = ((SendingLink_1_0)link).getConsumerTarget();
+ ConsumerTarget_1_0 target = endpoint.getConsumerTarget();
target.flowStateChanged();
}
@@ -1597,9 +1624,7 @@ public class Session_1_0 extends Abstrac
for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
{
- StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
-
- if (isQueueDestinationForLink(queue, link.getDestination()))
+ if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination()))
{
endpoint.setStopped(true);
}
@@ -1639,8 +1664,7 @@ public class Session_1_0 extends Abstrac
}
for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
{
- StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
- if (isQueueDestinationForLink(queue, link.getDestination()))
+ if (isQueueDestinationForLink(queue, endpoint.getReceivingDestination()))
{
endpoint.setStopped(false);
}
@@ -1701,8 +1725,7 @@ public class Session_1_0 extends Abstrac
}
for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
{
- StandardReceivingLink_1_0 link = (StandardReceivingLink_1_0) endpoint.getLink();
- if(!_blockingEntities.contains(link.getDestination()))
+ if(!_blockingEntities.contains(endpoint.getReceivingDestination()))
{
endpoint.setStopped(false);
}
@@ -1925,7 +1948,7 @@ public class Session_1_0 extends Abstrac
{
try
{
- link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+ link.setLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint);
}
catch (AmqpErrorException e)
{
@@ -1947,7 +1970,7 @@ public class Session_1_0 extends Abstrac
{
if (link.getEndpoint() == linkEndpoint)
{
- link.setLinkAttachment(new ReceivingLinkAttachment(null, (ReceivingLinkEndpoint) linkEndpoint));
+ link.setLinkAttachment((ReceivingLinkEndpoint) linkEndpoint);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org