You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/18 14:45:13 UTC

svn commit: r1770385 - in /qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0: LinkEndpoint.java ReceivingLinkEndpoint.java ReceivingLink_1_0.java Session_1_0.java

Author: rgodfrey
Date: Fri Nov 18 14:45:12 2016
New Revision: 1770385

URL: http://svn.apache.org/viewvc?rev=1770385&view=rev
Log:
QPID-7529 : Implement producer flow control in AMQP 1.0

Modified:
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Nov 18 14:45:12 2016
@@ -52,6 +52,10 @@ public abstract class LinkEndpoint<T ext
     private Map _initialUnsettledMap;
     private Map _localUnsettled;
     private UnsignedInteger _lastSentCreditLimit;
+    private volatile boolean _stopped;
+    private volatile boolean _stoppedUpdated;
+
+    private Link_1_0 _link;
 
 
     private enum State
@@ -108,6 +112,20 @@ public abstract class LinkEndpoint<T ext
         _state = State.ATTACH_RECVD;
     }
 
+    public boolean isStopped()
+    {
+        return _stopped;
+    }
+
+    public void setStopped(final boolean stopped)
+    {
+        if(_stopped != stopped)
+        {
+            _stopped = stopped;
+            _stoppedUpdated = true;
+        }
+    }
+
     protected abstract Map<Symbol,Object> initProperties(final Attach attach);
 
     public String getName()
@@ -386,15 +404,25 @@ public abstract class LinkEndpoint<T ext
     {
         if(_lastSentCreditLimit != null)
         {
-            UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
-            int i = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit);
-            if(i >=0)
+            if(_stoppedUpdated)
             {
                 sendFlow(_flowTransactionId != null);
+                _stoppedUpdated = false;
             }
             else
             {
-                getSession().sendFlowConditional();
+                UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+
+                // client has used up over half their credit allowance ?
+                boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
+                if (sendFlow)
+                {
+                    sendFlow(_flowTransactionId != null);
+                }
+                else
+                {
+                    getSession().sendFlowConditional();
+                }
             }
         }
         else
@@ -425,10 +453,18 @@ public abstract class LinkEndpoint<T ext
         if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
         {
             Flow flow = new Flow();
-            flow.setLinkCredit(_linkCredit);
             flow.setDeliveryCount(_deliveryCount);
             flow.setEcho(echo);
-            _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+            if(_stopped)
+            {
+                flow.setLinkCredit(_linkCredit);
+                _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+            }
+            else
+            {
+                flow.setLinkCredit(UnsignedInteger.ZERO);
+                _lastSentCreditLimit = _deliveryCount;
+            }
             flow.setAvailable(_available);
             flow.setDrain(_drain);
             if(setTransactionId)
@@ -488,6 +524,16 @@ public abstract class LinkEndpoint<T ext
         _localUnsettled = unsettled;
     }
 
+    public Link_1_0 getLink()
+    {
+        return _link;
+    }
+
+    public void setLink(final Link_1_0 link)
+    {
+        _link = link;
+    }
+
     @Override public String toString()
     {
         return "LinkEndpoint{" +

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Nov 18 14:45:12 2016
@@ -93,18 +93,6 @@ public class ReceivingLinkEndpoint exten
     private UnsignedInteger _drainLimit;
 
 
-    public ReceivingLinkEndpoint(final Session_1_0 session, String name)
-    {
-        this(session,name,null);
-    }
-
-    public ReceivingLinkEndpoint(final Session_1_0 session, String name, Map<Binary, Outcome> unsettledMap)
-    {
-        super(session, name, unsettledMap);
-        setDeliveryCount(UnsignedInteger.valueOf(0));
-        setLinkEventListener(ReceivingLinkListener.DEFAULT);
-    }
-
     public ReceivingLinkEndpoint(final Session_1_0 session, final Attach attach)
     {
         super(session, attach);
@@ -370,7 +358,7 @@ public class ReceivingLinkEndpoint exten
             {
                 if (settled(deliveryTag) && _creditWindow)
                 {
-                    setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1)));
+                    setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
                 }
             }
             sendFlowConditional();

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Nov 18 14:45:12 2016
@@ -336,4 +336,9 @@ public class ReceivingLink_1_0 implement
     {
         return _unsettledMap;
     }
+
+    public ReceivingDestination getDestination()
+    {
+        return _destination;
+    }
 }

Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1770385&r1=1770384&r2=1770385&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Nov 18 14:45:12 2016
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -126,7 +127,7 @@ public class Session_1_0 implements AMQS
     private final Subject _subject = new Subject();
 
     private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
-    private final CopyOnWriteArrayList<SendingLink_1_0> _sendingLinks = new CopyOnWriteArrayList<>();
+
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
@@ -135,7 +136,8 @@ public class Session_1_0 implements AMQS
 
     private SessionState _state ;
 
-    private final Map<String, LinkEndpoint> _linkMap = new HashMap<>();
+    private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
+    private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
     private final Map<LinkEndpoint, UnsignedInteger> _localLinkEndpoints = new HashMap<>();
     private final Map<UnsignedInteger, LinkEndpoint> _remoteLinkEndpoints = new HashMap<>();
     private long _lastAttachedTime;
@@ -168,6 +170,8 @@ public class Session_1_0 implements AMQS
             new Error(LinkError.DETACH_FORCED,
                       "Force detach the link because the session is remotely ended.");
 
+    private final Set<Object> _blockingEntities = Collections.newSetFromMap(new ConcurrentHashMap<Object,Boolean>());
+
 
     public Session_1_0(final AMQPConnection_1_0 connection)
     {
@@ -230,17 +234,24 @@ public class Session_1_0 implements AMQS
             }
             else
             {
-                LinkEndpoint endpoint = _linkMap.get(attach.getName());
+                Map<String, ? extends LinkEndpoint> linkMap =
+                        attach.getRole() == Role.RECEIVER ? _sendingLinkMap : _receivingLinkMap;
+                LinkEndpoint endpoint = linkMap.get(attach.getName());
                 if(endpoint == null)
                 {
                     endpoint = attach.getRole() == Role.RECEIVER
                                ? new SendingLinkEndpoint(this, attach)
                                : new ReceivingLinkEndpoint(this, attach);
 
+                    if(_blockingEntities.contains(this) && attach.getRole() == Role.SENDER)
+                    {
+                        endpoint.setStopped(true);
+                    }
+
                     // TODO : fix below - distinguish between local and remote owned
                     endpoint.setSource(attach.getSource());
                     endpoint.setTarget(attach.getTarget());
-                    _linkMap.put(attach.getName(), endpoint);
+                    ((Map<String,LinkEndpoint>)linkMap).put(attach.getName(), endpoint);
                 }
                 else
                 {
@@ -265,7 +276,7 @@ public class Session_1_0 implements AMQS
                 }
                 else
                 {
-                    endpoint.receiveAttach(attach);
+                    // TODO - error already attached
                 }
             }
         }
@@ -942,6 +953,11 @@ public class Session_1_0 implements AMQS
                                                                        target.getCapabilities());
                             target.setCapabilities(destination.getCapabilities());
 
+                            if(_blockingEntities.contains(messageDestination))
+                            {
+                                endpoint.setStopped(true);
+                            }
+
                         }
                         else if (!addr.startsWith("/") && addr.contains("/"))
                         {
@@ -1011,7 +1027,6 @@ public class Session_1_0 implements AMQS
 
                         receivingLinkEndpoint.setLinkEventListener(new SubjectSpecificReceivingLinkListener(
                                 receivingLink));
-
                         link = receivingLink;
                         if (TerminusDurability.UNSETTLED_STATE.equals(target.getDurable())
                             || TerminusDurability.CONFIGURATION.equals(target.getDurable()))
@@ -1045,6 +1060,7 @@ public class Session_1_0 implements AMQS
         }
         else
         {
+            endpoint.setLink(link);
             link.start();
         }
     }
@@ -1057,7 +1073,6 @@ public class Session_1_0 implements AMQS
         {
             Consumer<?> modelConsumer = (Consumer<?>) consumer;
             _consumers.add(modelConsumer);
-            _sendingLinks.add(link);
             modelConsumer.addChangeListener(_consumerClosedListener);
             consumerAdded(modelConsumer);
         }
@@ -1253,15 +1268,18 @@ public class Session_1_0 implements AMQS
     @Override
     public void transportStateChanged()
     {
-        for(SendingLink_1_0 link : _sendingLinks)
+        for(SendingLinkEndpoint endpoint : _sendingLinkMap.values())
         {
-            ConsumerTarget_1_0 target = link.getConsumerTarget();
+            Link_1_0 link = endpoint.getLink();
+            ConsumerTarget_1_0 target = ((SendingLink_1_0)link).getConsumerTarget();
             target.flowStateChanged();
         }
+
         if (!_consumersWithPendingWork.isEmpty() && !getAMQPConnection().isTransportBlockedForWriting())
         {
             getAMQPConnection().notifyWork(this);
         }
+
     }
 
     @Override
@@ -1271,34 +1289,124 @@ public class Session_1_0 implements AMQS
     }
 
     @Override
-    public void block(Queue<?> queue)
+    public void block(final Queue<?> queue)
     {
-        // TODO - required for AMQSessionModel / producer side flow control
+        getAMQPConnection().doOnIOThreadAsync(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        doBlock(queue);
+                    }
+                });
+    }
+
+    private void doBlock(final Queue<?> queue)
+    {
+        if(_blockingEntities.add(queue))
+        {
+            for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            {
+                ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+                if (queue == link.getDestination())
+                {
+                    endpoint.setStopped(true);
+                }
+            }
+
+        }
     }
 
     @Override
-    public void unblock(Queue<?> queue)
+    public void unblock(final Queue<?> queue)
+    {
+        getAMQPConnection().doOnIOThreadAsync(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        doUnblock(queue);
+                    }
+                });
+    }
+
+    private void doUnblock(final Queue<?> queue)
     {
-        // TODO - required for AMQSessionModel / producer side flow control
+        if(_blockingEntities.remove(queue) && !_blockingEntities.contains(this))
+        {
+            for (ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            {
+                ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+                if (queue == link.getDestination())
+                {
+                    endpoint.setStopped(false);
+                }
+            }
+        }
     }
 
     @Override
     public void block()
     {
-        // TODO - required for AMQSessionModel / producer side flow control
+        getAMQPConnection().doOnIOThreadAsync(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        doBlock();
+                    }
+                });
+    }
+
+    private void doBlock()
+    {
+        if(_blockingEntities.add(this))
+        {
+            for(LinkEndpoint endpoint : _receivingLinkMap.values())
+            {
+                endpoint.setStopped(true);
+            }
+        }
     }
 
+
+
     @Override
     public void unblock()
     {
-        // TODO - required for AMQSessionModel / producer side flow control
+        getAMQPConnection().doOnIOThreadAsync(
+                new Runnable()
+                {
+                    @Override
+                    public void run()
+                    {
+                        doUnblock();
+                    }
+                });
+    }
+
+    private void doUnblock()
+    {
+        if(_blockingEntities.remove(this))
+        {
+            for(ReceivingLinkEndpoint endpoint : _receivingLinkMap.values())
+            {
+                ReceivingLink_1_0 link = (ReceivingLink_1_0) endpoint.getLink();
+                if(!_blockingEntities.contains(link.getDestination()))
+                {
+                    endpoint.setStopped(false);
+                }
+            }
+        }
     }
 
     @Override
     public boolean getBlocking()
     {
-        // TODO
-        return false;
+        return !_blockingEntities.isEmpty();
     }
 
     @Override
@@ -1638,7 +1746,8 @@ public class Session_1_0 implements AMQS
 
             if (Boolean.TRUE.equals(detach.getClosed()))
             {
-                _linkMap.remove(endpoint.getName());
+                Map<String, ? extends LinkEndpoint> linkMap = endpoint.getRole() == Role.SENDER ? _sendingLinkMap : _receivingLinkMap;
+                linkMap.remove(endpoint.getName());
             }
         }
         else
@@ -1661,20 +1770,33 @@ public class Session_1_0 implements AMQS
 
         final LinkRegistry linkRegistry = getAddressSpace().getLinkRegistry(getConnection().getRemoteContainerId());
 
-        for(LinkEndpoint<?> linkEndpoint : _linkMap.values())
+        for(LinkEndpoint<?> linkEndpoint : _sendingLinkMap.values())
         {
-            if (linkEndpoint.getRole() == Role.SENDER)
+            final SendingLink_1_0 link = (SendingLink_1_0) linkRegistry.getDurableSendingLink(linkEndpoint.getName());
+
+            if (link != null)
             {
-                final SendingLink_1_0 link = (SendingLink_1_0) linkRegistry.getDurableSendingLink(linkEndpoint.getName());
+                synchronized (link)
+                {
+                    if (link.getEndpoint() == linkEndpoint)
+                    {
+                        link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
+                    }
+                }
+            }
+        }
 
-                if (link != null)
+        for(LinkEndpoint<?> linkEndpoint : _receivingLinkMap.values())
+        {
+            final ReceivingLink_1_0 link = (ReceivingLink_1_0) linkRegistry.getDurableReceivingLink(linkEndpoint.getName());
+
+            if (link != null)
+            {
+                synchronized (link)
                 {
-                    synchronized (link)
+                    if (link.getEndpoint() == linkEndpoint)
                     {
-                        if (link.getEndpoint() == linkEndpoint)
-                        {
-                            link.setLinkAttachment(new SendingLinkAttachment(null, (SendingLinkEndpoint) linkEndpoint));
-                        }
+                        link.setLinkAttachment(new ReceivingLinkAttachment(null, (ReceivingLinkEndpoint) linkEndpoint));
                     }
                 }
             }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org