You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/15 09:42:20 UTC

svn commit: r1769763 - in /qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server: consumer/AbstractConsumerTarget.java queue/AbstractQueue.java queue/QueueConsumerManager.java queue/QueueConsumerManagerImpl.java

Author: lquack
Date: Tue Nov 15 09:42:20 2016
New Revision: 1769763

URL: http://svn.apache.org/viewvc?rev=1769763&view=rev
Log:
fix spurious consumer wakeups

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 09:42:20 2016
@@ -115,17 +115,12 @@ public abstract class AbstractConsumerTa
                 }
             }
 
-            // TODO - remove once queue is smarter
             for (ConsumerImpl consumer : _consumers)
             {
                 consumer.setNotifyWorkDesired(desired);
             }
 
             _notifyWorkDesired = desired;
-            if (desired)
-            {
-                notifyWork();
-            }
         }
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 09:42:20 2016
@@ -283,28 +283,29 @@ public abstract class AbstractQueue<X ex
 
     void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
     {
-        _queueConsumerManager.setInterest(consumer, desired);
-
-        if (desired)
-        {
-            _activeSubscriberCount.incrementAndGet();
-        }
-        else
+        if (_queueConsumerManager.setInterest(consumer, desired))
         {
-            _activeSubscriberCount.decrementAndGet();
+            if (desired)
+            {
+                _activeSubscriberCount.incrementAndGet();
+                notifyConsumer(consumer);
+            }
+            else
+            {
+                _activeSubscriberCount.decrementAndGet();
 
-            // iterate over interested and notify one as long as its priority is higher than any notified
-            final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
-            while(consumerIterator.hasNext())
-            {
-                QueueConsumer<?> queueConsumer = consumerIterator.next();
-                //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
-                if(notifyConsumer(queueConsumer))
-                {
-                    break;
+                // iterate over interested and notify one as long as its priority is higher than any notified
+                final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+                while (consumerIterator.hasNext())
+                {
+                    QueueConsumer<?> queueConsumer = consumerIterator.next();
+                    //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
+                    if (notifyConsumer(queueConsumer))
+                    {
+                        break;
+                    }
                 }
             }
-
         }
     }
 
@@ -976,6 +977,10 @@ public abstract class AbstractQueue<X ex
         consumer.setQueueContext(queueContext);
 
         _queueConsumerManager.addConsumer(consumer);
+        if (consumer.isNotifyWorkDesired())
+        {
+            _activeSubscriberCount.incrementAndGet();
+        }
 
         childAdded(consumer);
         consumer.addChangeListener(_deletedChildListener);
@@ -1938,16 +1943,19 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    void notifyConsumers()
+    void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
     {
         final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
         while (interestedIterator.hasNext())
         {
             QueueConsumer<?> consumer = interestedIterator.next();
 
-            if(notifyConsumer(consumer))
+            if (excludedConsumer != consumer)
             {
-                break;
+                if (notifyConsumer(consumer))
+                {
+                    break;
+                }
             }
         }
     }
@@ -1974,6 +1982,15 @@ public abstract class AbstractQueue<X ex
                 {
                     queueEmpty = true;
                 }
+
+                if(messageContainer == null && consumer.acquires())
+                {
+                    // TODO - Should be only checking for available messages
+                    if(!isEmpty())
+                    {
+                        notifyOtherConsumers(consumer);
+                    }
+                }
             }
             else
             {
@@ -1989,16 +2006,8 @@ public abstract class AbstractQueue<X ex
             }
 
             consumer.flushBatched();
-
-        }
-        if(messageContainer == null && consumer.acquires())
-        {
-            // TODO - Should be only checking for available messages
-            if(!isEmpty())
-            {
-                notifyConsumers();
-            }
         }
+
         return messageContainer;
     }
 

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Tue Nov 15 09:42:20 2016
@@ -26,7 +26,7 @@ public interface QueueConsumerManager
 {
     void addConsumer(QueueConsumer<?> consumer);
     boolean removeConsumer(QueueConsumer<?> consumer);
-    /*public*/ void setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
+    /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
     /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified); // called from Queue
 
     // should be priority and then insertion order

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 09:42:20 2016
@@ -101,29 +101,29 @@ public class QueueConsumerManagerImpl im
 
     // Set by the consumer always in the IO thread
     @Override
-    public void setInterest(final QueueConsumer consumer, final boolean interested)
+    public boolean setInterest(final QueueConsumer consumer, final boolean interested)
     {
         QueueConsumerNode node = consumer.getQueueConsumerNode();
         if (interested)
         {
             if (consumer.acquires())
             {
-                node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.INTERESTED);
+                return node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.INTERESTED);
             }
             else
             {
-                node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.NON_ACQUIRING);
+                return node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.NON_ACQUIRING);
             }
         }
         else
         {
             if (consumer.acquires())
             {
-                node.moveFromTo(EnumSet.of(NodeState.INTERESTED, NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
+                return node.moveFromTo(EnumSet.of(NodeState.INTERESTED, NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
             }
             else
             {
-                node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING), NodeState.NOT_INTERESTED);
+                return node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING), NodeState.NOT_INTERESTED);
             }
         }
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org