You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 11:19:12 UTC

svn commit: r1769780 - in /qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue: AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java

Author: rgodfrey
Date: Tue Nov 15 11:19:12 2016
New Revision: 1769780

URL: http://svn.apache.org/viewvc?rev=1769780&view=rev
Log:
Optimisations to prevent unnecessary notification attempts

Modified:
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
    qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 11:19:12 2016
@@ -281,11 +281,11 @@ public abstract class AbstractQueue<X ex
 
                 // iterate over interested and notify one as long as its priority is higher than any notified
                 final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+                final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
                 while (consumerIterator.hasNext())
                 {
                     QueueConsumer<?> queueConsumer = consumerIterator.next();
-                    //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
-                    if (notifyConsumer(queueConsumer))
+                    if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
                     {
                         break;
                     }
@@ -1836,16 +1836,22 @@ public abstract class AbstractQueue<X ex
             }
         }
 
-        // TODO need to take account of priority and potentially and existing notified
-        // we don't want to notify lower priority consumers if there exists a consumer in the notified set
-        // which can take the message (implies iterating such that you look at for each priority look at interested then at notified)
         final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
         while (entry.isAvailable() && interestedIterator.hasNext())
         {
             QueueConsumer<?> consumer = interestedIterator.next();
-            if(consumer.hasInterest(entry) && notifyConsumer(consumer))
+            if(consumer.hasInterest(entry))
             {
-                break;
+                if(notifyConsumer(consumer))
+                {
+                    break;
+                }
+                else if(!noHigherPriorityWithCredit(consumer, entry))
+                {
+                    // there exists a higher priority consumer that would take this message, therefore no point in
+                    // continuing to loop
+                    break;
+                }
             }
         }
     }
@@ -1853,7 +1859,7 @@ public abstract class AbstractQueue<X ex
     void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
     {
         final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
-        while (interestedIterator.hasNext())
+        while (hasAvailableMessages() && interestedIterator.hasNext())
         {
             QueueConsumer<?> consumer = interestedIterator.next();
 
@@ -1892,8 +1898,7 @@ public abstract class AbstractQueue<X ex
 
                 if(messageContainer == null && consumer.acquires())
                 {
-                    // TODO - Should be only checking for available messages
-                    if(!isEmpty())
+                    if(hasAvailableMessages())
                     {
                         notifyOtherConsumers(consumer);
                     }
@@ -1918,6 +1923,11 @@ public abstract class AbstractQueue<X ex
         return messageContainer;
     }
 
+    private boolean hasAvailableMessages()
+    {
+        return _queueStatistics.getAvailableCount() != 0;
+    }
+
     public static class MessageContainer
     {
         public final MessageInstance _messageInstance;
@@ -2006,7 +2016,10 @@ public abstract class AbstractQueue<X ex
             QueueConsumer<?> consumer = consumerIterator.next();
             if(consumer.getPriority() > sub.getPriority())
             {
-                if(consumer.isNotifyWorkDesired() && consumer.hasInterest(queueEntry) && getNextAvailableEntry(consumer) != null)
+                if(consumer.isNotifyWorkDesired()
+                   && consumer.acquires()
+                   && consumer.hasInterest(queueEntry)
+                   && getNextAvailableEntry(consumer) != null)
                 {
                     return false;
                 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Tue Nov 15 11:19:12 2016
@@ -35,11 +35,10 @@ public interface QueueConsumerManager
     Iterator<QueueConsumer<?>> getAllIterator();
     Iterator<QueueConsumer<?>> getNonAcquiringIterator();
 
-    Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator();
-
     int getAllSize();
     //        int getInterestedSize();
     int getNotifiedAcquiringSize();
 
 
+    int getHighestNotifiedPriority();
 }

Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 11:19:12 2016
@@ -179,12 +179,6 @@ public class QueueConsumerManagerImpl im
     }
 
     @Override
-    public Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator()
-    {
-        return null;
-    }
-
-    @Override
     public int getAllSize()
     {
         return _count;
@@ -196,6 +190,22 @@ public class QueueConsumerManagerImpl im
         return _notified.size();
     }
 
+    @Override
+    public int getHighestNotifiedPriority()
+    {
+        final Iterator<QueueConsumerNode> notifiedIterator =
+                new PrioritisedQueueConsumerNodeIterator(_notified);
+        if(notifiedIterator.hasNext())
+        {
+            final QueueConsumerNode queueConsumerNode = notifiedIterator.next();
+            return queueConsumerNode.getQueueConsumer().getPriority();
+        }
+        else
+        {
+            return Integer.MIN_VALUE;
+        }
+    }
+
     QueueConsumerNodeListEntry addNodeToInterestList(final QueueConsumerNode queueConsumerNode)
     {
         QueueConsumerNodeListEntry newListEntry;



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org