You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/23 14:27:39 UTC

svn commit: r1770975 - in /qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue: AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java QueueConsumerNode.java

Author: lquack
Date: Wed Nov 23 14:27:38 2016
New Revision: 1770975

URL: http://svn.apache.org/viewvc?rev=1770975&view=rev
Log:
QPID-7514: [Java Broker] remove affirmation logic from QueueConsumerManager

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 23 14:27:38 2016
@@ -1861,8 +1861,7 @@ public abstract class AbstractQueue<X ex
         boolean queueEmpty = false;
         MessageContainer messageContainer = null;
 
-        _queueConsumerManager.clearStateAffirmationFlag(consumer);
-
+        _queueConsumerManager.setNotified(consumer, false);
         try
         {
 
@@ -1870,11 +1869,8 @@ public abstract class AbstractQueue<X ex
             {
                 messageContainer = attemptDelivery(consumer);
 
-
                 if(messageContainer.getMessageInstance() == null)
                 {
-                    _queueConsumerManager.setNotified(consumer, false, true);
-
                     if (messageContainer.hasNoAvailableMessages())
                     {
                         queueEmpty = true;
@@ -1889,6 +1885,10 @@ public abstract class AbstractQueue<X ex
                     }
                     messageContainer = null;
                 }
+                else
+                {
+                    _queueConsumerManager.setNotified(consumer, true);
+                }
             }
             else
             {
@@ -2225,7 +2225,7 @@ public abstract class AbstractQueue<X ex
 
     private boolean notifyConsumer(final QueueConsumer<?> consumer)
     {
-        if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true, false))
+        if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
         {
             consumer.notifyWork();
             return true;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Wed Nov 23 14:27:38 2016
@@ -26,19 +26,13 @@ public interface QueueConsumerManager
 {
     void addConsumer(QueueConsumer<?> consumer);
     boolean removeConsumer(QueueConsumer<?> consumer);
-    /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
-    /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified, final boolean conditional); // called from Queue
+    boolean setInterest(QueueConsumer<?> consumer, boolean interested);
+    boolean setNotified(QueueConsumer<?> consumer, boolean notified);
 
-    // should be priority and then insertion order
     Iterator<QueueConsumer<?>> getInterestedIterator();
-
     Iterator<QueueConsumer<?>> getAllIterator();
     Iterator<QueueConsumer<?>> getNonAcquiringIterator();
 
     int getAllSize();
-    //        int getInterestedSize();
-    int getNotifiedAcquiringSize();
-
-
     int getHighestNotifiedPriority();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Wed Nov 23 14:27:38 2016
@@ -81,16 +81,16 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                node.moveFromTo(REMOVED, NodeState.INTERESTED, false);
+                node.moveFromTo(REMOVED, NodeState.INTERESTED);
             }
             else
             {
-                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING, false);
+                node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING);
             }
         }
         else
         {
-            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED, false);
+            node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED);
         }
         _count++;
     }
@@ -102,7 +102,7 @@ public class QueueConsumerManagerImpl im
         removeFromAll(consumer);
         QueueConsumerNode node = consumer.getQueueConsumerNode();
 
-        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED, false))
+        if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED))
         {
             _count--;
             return true;
@@ -119,46 +119,40 @@ public class QueueConsumerManagerImpl im
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED, false);
+                return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED);
             }
             else
             {
-                return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING, false);
+                return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING);
             }
         }
         else
         {
             if (consumer.acquires())
             {
-                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED, false);
+                return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED);
             }
             else
             {
-                return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED, false);
+                return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED);
             }
         }
     }
 
-    public void clearStateAffirmationFlag(final QueueConsumer consumer)
-    {
-        QueueConsumerNode node = consumer.getQueueConsumerNode();
-        node.clearAffirmation();
-    }
-
     // Set by the Queue any IO thread
     @Override
-    public boolean setNotified(final QueueConsumer consumer, final boolean notified, final boolean conditional)
+    public boolean setNotified(final QueueConsumer consumer, final boolean notified)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (consumer.acquires())
         {
             if (notified)
             {
-                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED, conditional);
+                return node.moveFromTo(INTERESTED, NodeState.NOTIFIED);
             }
             else
             {
-                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED, conditional);
+                return node.moveFromTo(NOTIFIED, NodeState.INTERESTED);
             }
         }
         else
@@ -192,12 +186,6 @@ public class QueueConsumerManagerImpl im
     }
 
     @Override
-    public int getNotifiedAcquiringSize()
-    {
-        return _notified.size();
-    }
-
-    @Override
     public int getHighestNotifiedPriority()
     {
         final Iterator<QueueConsumerNode> notifiedIterator =

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Wed Nov 23 14:27:38 2016
@@ -29,7 +29,6 @@ final class QueueConsumerNode
     private QueueConsumerNodeListEntry _listEntry;
     private QueueConsumerManagerImpl.NodeState _state = QueueConsumerManagerImpl.NodeState.REMOVED;
     private QueueConsumerNodeListEntry _allEntry;
-    private boolean _affirmed;
 
     QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?> queueConsumer)
     {
@@ -47,37 +46,21 @@ final class QueueConsumerNode
         return _state;
     }
 
-    public synchronized void clearAffirmation()
-    {
-        _affirmed = false;
-    }
-
-
     public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
-                                           QueueConsumerManagerImpl.NodeState toState,
-                                           final boolean conditional)
+                                           QueueConsumerManagerImpl.NodeState toState)
     {
         if (fromStates.contains(_state))
         {
-            if(!conditional || !_affirmed)
-            {
-                if (_listEntry != null)
-                {
-                    _listEntry.remove();
-                }
-                _state = toState;
-                _listEntry = _queueConsumerManager.addNodeToInterestList(this);
-                return true;
-            }
-            else
+            if (_listEntry != null)
             {
-                _affirmed = false;
-                return false;
+                _listEntry.remove();
             }
+            _state = toState;
+            _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+            return true;
         }
         else
         {
-            _affirmed = _state == toState;
             return false;
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org