You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/16 21:36:27 UTC

svn commit: r1770060 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue: AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java QueueConsumerNode.java

Author: rgodfrey
Date: Wed Nov 16 21:36:27 2016
New Revision: 1770060

URL: http://svn.apache.org/viewvc?rev=1770060&view=rev
Log:
QPID-7514 : reduce the occurrences of consumers being removed and then immediately being reinserted into a list

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 16 21:36:27 2016
@@ -1880,19 +1880,20 @@ public abstract class AbstractQueue<X ex
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
 
-        _queueConsumerManager.setNotified(consumer, false);
+        _queueConsumerManager.clearStateAffirmationFlag(consumer);
 
         try
         {
+
             if (!consumer.isSuspended())
             {
                 messageContainer = attemptDelivery(consumer);
-                if(messageContainer.getMessageInstance() != null)
-                {
-                    _queueConsumerManager.setNotified(consumer, true);
-                }
-                else
+
+
+                if(messageContainer.getMessageInstance() == null)
                 {
+                    _queueConsumerManager.setNotified(consumer, false, true);
+
                     if (messageContainer.hasNoAvailableMessages())
                     {
                         queueEmpty = true;
@@ -2243,7 +2244,7 @@ public abstract class AbstractQueue<X ex
 
     private boolean notifyConsumer(final QueueConsumer<?> consumer)
     {
-        if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
+        if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true, false))
         {
             consumer.notifyWork();
             return true;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Wed Nov 16 21:36:27 2016
@@ -27,7 +27,7 @@ public interface QueueConsumerManager
     void addConsumer(QueueConsumer<?> consumer);
     boolean removeConsumer(QueueConsumer<?> consumer);
     /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
-    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified); // called from Queue
+    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified, final boolean conditional); // called from Queue
 
     // should be priority and then insertion order
     Iterator<QueueConsumer<?>> getInterestedIterator();

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Wed Nov 16 21:36:27 2016
@@ -81,16 +81,16 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                node.moveFromTo(REMOVED, NodeState.INTERESTED);
+                node.moveFromTo(REMOVED, NodeState.INTERESTED, false);
             }
             else
             {
-                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING);
+                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING, false);
             }
         }
         else
         {
-            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED);
+            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED, false);
         }
         _count++;
     }
@@ -102,7 +102,7 @@ public class QueueConsumerManagerImpl im
         removeFromAll(consumer);
         QueueConsumerNode node = consumer.getQueueConsumerNode();
 
-        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED))
+        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED, false))
         {
             _count--;
             return true;
@@ -119,40 +119,46 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED);
+                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED, false);
             }
             else
             {
-                return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING);
+                return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING, false);
             }
         }
         else
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED);
+                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED, false);
             }
             else
             {
-                return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED);
+                return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED, false);
             }
         }
     }
 
+    public void clearStateAffirmationFlag(final QueueConsumer consumer)
+    {
+        QueueConsumerNode node = consumer.getQueueConsumerNode();
+        node.clearAffirmaion();
+    }
+
     // Set by the Queue any IO thread
     @Override
-    public boolean setNotified(final QueueConsumer consumer, final boolean notified)
+    public boolean setNotified(final QueueConsumer consumer, final boolean notified, final boolean conditional)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (consumer.acquires())
         {
             if (notified)
             {
-                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED);
+                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED, conditional);
             }
             else
             {
-                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED);
+                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED, conditional);
             }
         }
         else

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Wed Nov 16 21:36:27 2016
@@ -29,6 +29,7 @@ final class QueueConsumerNode
     private QueueConsumerNodeListEntry _listEntry;
     private QueueConsumerManagerImpl.NodeState _state = QueueConsumerManagerImpl.NodeState.REMOVED;
     private QueueConsumerNodeListEntry _allEntry;
+    private boolean _affirmed;
 
     QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?> queueConsumer)
     {
@@ -46,20 +47,37 @@ final class QueueConsumerNode
         return _state;
     }
 
-    public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates, QueueConsumerManagerImpl.NodeState toState)
+    public synchronized void clearAffirmaion()
+    {
+        _affirmed = false;
+    }
+
+
+    public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
+                                           QueueConsumerManagerImpl.NodeState toState,
+                                           final boolean conditional)
     {
         if (fromStates.contains(_state))
         {
-            if (_listEntry != null)
+            if(!conditional || !_affirmed)
+            {
+                if (_listEntry != null)
+                {
+                    _listEntry.remove();
+                }
+                _state = toState;
+                _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+                return true;
+            }
+            else
             {
-                _listEntry.remove();
+                _affirmed = false;
+                return false;
             }
-            _state = toState;
-            _listEntry = _queueConsumerManager.addNodeToInterestList(this);
-            return true;
         }
         else
         {
+            _affirmed = _state == toState;
             return false;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org