You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/18 14:45:13 UTC
svn commit: r1770385 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:
LinkEndpoint.java ReceivingLinkEndpoint.java ReceivingLink_1_0.java
Session_1_0.java
Author: rgodfrey
Date: Fri Nov 18 14:45:12 2016
New Revision: 1770385
URL: http://svn.apache.org/viewvc?rev=1770385&view=rev
Log:
QPID-7529 : Implement producer flow control in AMQP 1.0
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Nov 18 14:45:12 2016
@@ -52,6 +52,10 @@ public abstract class LinkEndpoint<T ext
private Map _initialUnsettledMap;
private Map _localUnsettled;
private UnsignedInteger _lastSentCreditLimit;
+ private volatile boolean _stopped;
+ private volatile boolean _stoppedUpdated;
+
+ private Link_1_0 _link;
private enum State
@@ -108,6 +112,20 @@ public abstract class LinkEndpoint<T ext
_state = State.ATTACH_RECVD;
}
+ public boolean isStopped()
+ {
+ return _stopped;
+ }
+
+ public void setStopped(final boolean stopped)
+ {
+ if(_stopped != stopped)
+ {
+ _stopped = stopped;
+ _stoppedUpdated = true;
+ }
+ }
+
protected abstract Map<Symbol,Object> initProperties(final Attach attach);
public String getName()
@@ -386,15 +404,25 @@ public abstract class LinkEndpoint<T ext
{
if(_lastSentCreditLimit != null)
{
- UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
- int i = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit);
- if(i >=0)
+ if(_stoppedUpdated)
{
sendFlow(_flowTransactionId != null);
+ _stoppedUpdated = false;
}
else
{
- getSession().sendFlowConditional();
+ UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+
+ // client has used up over half their credit allowance ?
+ boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
+ if (sendFlow)
+ {
+ sendFlow(_flowTransactionId != null);
+ }
+ else
+ {
+ getSession().sendFlowConditional();
+ }
}
}
else
@@ -425,10 +453,18 @@ public abstract class LinkEndpoint<T ext
if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
{
Flow flow = new Flow();
- flow.setLinkCredit(_linkCredit);
flow.setDeliveryCount(_deliveryCount);
flow.setEcho(echo);
- _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+ if(_stopped)
+ {
+ flow.setLinkCredit(_linkCredit);
+ _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+ }
+ else
+ {
+ flow.setLinkCredit(UnsignedInteger.ZERO);
+ _lastSentCreditLimit = _deliveryCount;
+ }
flow.setAvailable(_available);
flow.setDrain(_drain);
if(setTransactionId)
@@ -488,6 +524,16 @@ public abstract class LinkEndpoint<T ext
_localUnsettled = unsettled;
}
+ public Link_1_0 getLink()
+ {
+ return _link;
+ }
+
+ public void setLink(final Link_1_0 link)
+ {
+ _link = link;
+ }
+
@Override public String toString()
{
return "LinkEndpoint{" +
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Nov 18 14:45:12 2016
@@ -93,18 +93,6 @@ public class ReceivingLinkEndpoint exten
private UnsignedInteger _drainLimit;
- public ReceivingLinkEndpoint(final Session_1_0 session, String name)
- {
- this(session,name,null);
- }
-
- public ReceivingLinkEndpoint(final Session_1_0 session, String name, Map<Binary, Outcome> unsettledMap)
- {
- super(session, name, unsettledMap);
- setDeliveryCount(UnsignedInteger.valueOf(0));
- setLinkEventListener(ReceivingLinkListener.DEFAULT);
- }
-
public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
{
super(session, attach);
@@ -370,7 +358,7 @@ public class ReceivingLinkEndpoint exten
{
if (settled(deliveryTag) && _creditWindow)
{
- setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1)));
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
}
}
sendFlowConditional();
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Nov 18 14:45:12 2016
@@ -336,4 +336,9 @@ public class ReceivingLink_1_0 implement
{
return _unsettledMap;
}
+
+ public ReceivingDestination getDestination()
+ {
+ return _destination;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Nov 18 14:45:12 2016
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +127,7 @@ public class Session_1_0 implements AMQS
private final Subject _subject = new Subject();
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
- private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
+
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
@@ -135,7 +136,8 @@ public class Session_1_0 implements AMQS
private SessionState _state ;
- private final Map<String, LinkEndpoint> _linkMap = new HashMap<>();
+ private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
+ private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
private long _lastAttachedTime;
@@ -168,6 +170,8 @@ public class Session_1_0 implements AMQS
new Error(LinkError.DETACH_FORCED,
"Force detach the link because the session is remotely ended.");
+ private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
+
public Session_1_0(final AMQPConnection_1_0 connection)
{
@@ -230,17 +234,24 @@ public class Session_1_0 implements AMQS
}
else
{
- LinkEndpoint endpoint = _linkMap.get(attach.getName());
+ Map<String, ? extends LinkEndpoint> linkMap =
+ attach.getRole() == Role.RECEIVER ? _sendingLinkMap : _receivingLinkMap;
+ LinkEndpoint endpoint = linkMap.get(attach.getName());
if(endpoint == null)
{
endpoint = attach.getRole() == Role.RECEIVER
? new SendingLinkEndpoint(this, attach)
: new ReceivingLinkEndpoint(this, attach);
+ if(_blockingEntities.contains(this) && attach.getRole() == Role.SENDER)
+ {
+ endpoint.setStopped(true);
+ }
+
// TODO : fix below - distinguish between local and remote owned
endpoint.setSource(attach.getSource());
endpoint.setTarget(attach.getTarget());
- _linkMap.put(attach.getName(), endpoint);
+ ((Map<String,LinkEndpoint>)linkMap).put(attach.getName(), endpoint);
}
else
{
@@ -265,7 +276,7 @@ public class Session_1_0 implements AMQS
}
else
{
- endpoint.receiveAttach(attach);
+ // TODO - error already attached
}
}
}
@@ -942,6 +953,11 @@ public class Session_1_0 implements AMQS
target.getCapabilities());
target.setCapabilities(destination.getCapabilities());
+ if(_blockingEntities.contains(messageDestination))
+ {
+ endpoint.setStopped(true);
+ }
+
}
else if (!addr.startsWith("/") && addr.contains("/"))
{
@@ -1011,7 +1027,6 @@ public class Session_1_0 implements AMQS
receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
receivingLink));
-
link = receivingLink;
if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
|| TerminusDurability.CONFIGURATION.equals(target.getDurable()))
@@ -1045,6 +1060,7 @@ public class Session_1_0 implements AMQS
}
else
{
+ endpoint.setLink(link);
link.start();
}
}
@@ -1057,7 +1073,6 @@ public class Session_1_0 implements AMQS
{
Consumer<?> modelConsumer = (Consumer<?>) consumer;
_consumers.add(modelConsumer);
- _sendingLinks.add(link);
modelConsumer.addChangeListener(_consumerClosedListener);
consumerAdded(modelConsumer);
}
@@ -1253,15 +1268,18 @@ public class Session_1_0 implements AMQS
@Override
public void transportStateChanged()
{
- for(SendingLink_1_0 link : _sendingLinks)
+ for(SendingLinkEndpoint endpoint : _sendingLinkMap.values())
{
- ConsumerTarget_1_0 target = link.getConsumerTarget();
+ Link_1_0 link = endpoint.getLink();
+ ConsumerTarget_1_0 target = ((SendingLink_1_0)link).getConsumerTarget();
target.flowStateChanged();
}
+
if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
{
getAMQPConnection().notifyWork(this);
}
+
}
@Override
@@ -1271,34 +1289,124 @@ public class Session_1_0 implements AMQS
}
@Override
- public void block(Queue<?> queue)
+ public void block(final Queue<?> queue)
{
- // TODO - required for AMQSessionModel / producer side flow control
+ getAMQPConnection().doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doBlock(queue);
+ }
+ });
+ }
+
+ private void doBlock(final Queue<?> queue)
+ {
+ if(_blockingEntities.add(queue))
+ {
+ for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+ {
+ ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+ if (queue == link.getDestination())
+ {
+ endpoint.setStopped(true);
+ }
+ }
+
+ }
}
@Override
- public void unblock(Queue<?> queue)
+ public void unblock(final Queue<?> queue)
+ {
+ getAMQPConnection().doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doUnblock(queue);
+ }
+ });
+ }
+
+ private void doUnblock(final Queue<?> queue)
{
- // TODO - required for AMQSessionModel / producer side flow control
+ if(_blockingEntities.remove(queue) && !_blockingEntities.contains(this))
+ {
+ for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+ {
+ ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+ if (queue == link.getDestination())
+ {
+ endpoint.setStopped(false);
+ }
+ }
+ }
}
@Override
public void block()
{
- // TODO - required for AMQSessionModel / producer side flow control
+ getAMQPConnection().doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doBlock();
+ }
+ });
+ }
+
+ private void doBlock()
+ {
+ if(_blockingEntities.add(this))
+ {
+ for(LinkEndpoint endpoint : _receivingLinkMap.values())
+ {
+ endpoint.setStopped(true);
+ }
+ }
}
+
+
@Override
public void unblock()
{
- // TODO - required for AMQSessionModel / producer side flow control
+ getAMQPConnection().doOnIOThreadAsync(
+ new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ doUnblock();
+ }
+ });
+ }
+
+ private void doUnblock()
+ {
+ if(_blockingEntities.remove(this))
+ {
+ for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+ {
+ ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+ if(!_blockingEntities.contains(link.getDestination()))
+ {
+ endpoint.setStopped(false);
+ }
+ }
+ }
}
@Override
public boolean getBlocking()
{
- // TODO
- return false;
+ return !_blockingEntities.isEmpty();
}
@Override
@@ -1638,7 +1746,8 @@ public class Session_1_0 implements AMQS
if (Boolean.TRUE.equals(detach.getClosed()))
{
- _linkMap.remove(endpoint.getName());
+ Map<String, ? extends LinkEndpoint> linkMap = endpoint.getRole() == Role.SENDER ? _sendingLinkMap : _receivingLinkMap;
+ linkMap.remove(endpoint.getName());
}
}
else
@@ -1661,20 +1770,33 @@ public class Session_1_0 implements AMQS
final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
- for(LinkEndpoint<?> linkEndpoint : _linkMap.values())
+ for(LinkEndpoint<?> linkEndpoint : _sendingLinkMap.values())
{
- if (linkEndpoint.getRole() == Role.SENDER)
+ final SendingLink_1_0 link = (SendingLink_1_0) linkRegistry.getDurableSendingLink(linkEndpoint.getName());
+
+ if (link != null)
{
- final SendingLink_1_0 link = (SendingLink_1_0) linkRegistry.getDurableSendingLink(linkEndpoint.getName());
+ synchronized (link)
+ {
+ if (link.getEndpoint() == linkEndpoint)
+ {
+ link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+ }
+ }
+ }
+ }
- if (link != null)
+ for(LinkEndpoint<?> linkEndpoint : _receivingLinkMap.values())
+ {
+ final ReceivingLink_1_0 link = (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(linkEndpoint.getName());
+
+ if (link != null)
+ {
+ synchronized (link)
{
- synchronized (link)
+ if (link.getEndpoint() == linkEndpoint)
{
- if (link.getEndpoint() == linkEndpoint)
- {
- link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
- }
+ link.setLinkAttachment(new ReceivingLinkAttachment(null, (ReceivingLinkEndpoint) linkEndpoint));
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org