You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/15 11:19:12 UTC
svn commit: r1769780 - in
/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue:
AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java
Author: rgodfrey
Date: Tue Nov 15 11:19:12 2016
New Revision: 1769780
URL: http://svn.apache.org/viewvc?rev=1769780&view=rev
Log:
Optimisations to prevent unnecessary notification attempts
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 11:19:12 2016
@@ -281,11 +281,11 @@ public abstract class AbstractQueue<X ex
// iterate over interested and notify one as long as its priority is higher than any notified
final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+ final int highestNotifiedPriority = _queueConsumerManager.getHighestNotifiedPriority();
while (consumerIterator.hasNext())
{
QueueConsumer<?> queueConsumer = consumerIterator.next();
- //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
- if (notifyConsumer(queueConsumer))
+ if (queueConsumer.getPriority() < highestNotifiedPriority || notifyConsumer(queueConsumer))
{
break;
}
@@ -1836,16 +1836,22 @@ public abstract class AbstractQueue<X ex
}
}
- // TODO need to take account of priority and potentially and existing notified
- // we don't want to notify lower priority consumers if there exists a consumer in the notified set
- // which can take the message (implies iterating such that you look at for each priority look at interested then at notified)
final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
while (entry.isAvailable() && interestedIterator.hasNext())
{
QueueConsumer<?> consumer = interestedIterator.next();
- if(consumer.hasInterest(entry) && notifyConsumer(consumer))
+ if(consumer.hasInterest(entry))
{
- break;
+ if(notifyConsumer(consumer))
+ {
+ break;
+ }
+ else if(!noHigherPriorityWithCredit(consumer, entry))
+ {
+ // there exists a higher priority consumer that would take this message, therefore no point in
+ // continuing to loop
+ break;
+ }
}
}
}
@@ -1853,7 +1859,7 @@ public abstract class AbstractQueue<X ex
void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
{
final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
- while (interestedIterator.hasNext())
+ while (hasAvailableMessages() && interestedIterator.hasNext())
{
QueueConsumer<?> consumer = interestedIterator.next();
@@ -1892,8 +1898,7 @@ public abstract class AbstractQueue<X ex
if(messageContainer == null && consumer.acquires())
{
- // TODO - Should be only checking for available messages
- if(!isEmpty())
+ if(hasAvailableMessages())
{
notifyOtherConsumers(consumer);
}
@@ -1918,6 +1923,11 @@ public abstract class AbstractQueue<X ex
return messageContainer;
}
+ private boolean hasAvailableMessages()
+ {
+ return _queueStatistics.getAvailableCount() != 0;
+ }
+
public static class MessageContainer
{
public final MessageInstance _messageInstance;
@@ -2006,7 +2016,10 @@ public abstract class AbstractQueue<X ex
QueueConsumer<?> consumer = consumerIterator.next();
if(consumer.getPriority() > sub.getPriority())
{
- if(consumer.isNotifyWorkDesired() && consumer.hasInterest(queueEntry) && getNextAvailableEntry(consumer) != null)
+ if(consumer.isNotifyWorkDesired()
+ && consumer.acquires()
+ && consumer.hasInterest(queueEntry)
+ && getNextAvailableEntry(consumer) != null)
{
return false;
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Tue Nov 15 11:19:12 2016
@@ -35,11 +35,10 @@ public interface QueueConsumerManager
Iterator<QueueConsumer<?>> getAllIterator();
Iterator<QueueConsumer<?>> getNonAcquiringIterator();
- Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator();
-
int getAllSize();
// int getInterestedSize();
int getNotifiedAcquiringSize();
+ int getHighestNotifiedPriority();
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769780&r1=1769779&r2=1769780&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 11:19:12 2016
@@ -179,12 +179,6 @@ public class QueueConsumerManagerImpl im
}
@Override
- public Iterator<QueueConsumer<?>> getPrioritySortedNotifiedOrInterestedIterator()
- {
- return null;
- }
-
- @Override
public int getAllSize()
{
return _count;
@@ -196,6 +190,22 @@ public class QueueConsumerManagerImpl im
return _notified.size();
}
+ @Override
+ public int getHighestNotifiedPriority()
+ {
+ final Iterator<QueueConsumerNode> notifiedIterator =
+ new PrioritisedQueueConsumerNodeIterator(_notified);
+ if(notifiedIterator.hasNext())
+ {
+ final QueueConsumerNode queueConsumerNode = notifiedIterator.next();
+ return queueConsumerNode.getQueueConsumer().getPriority();
+ }
+ else
+ {
+ return Integer.MIN_VALUE;
+ }
+ }
+
QueueConsumerNodeListEntry addNodeToInterestList(final QueueConsumerNode queueConsumerNode)
{
QueueConsumerNodeListEntry newListEntry;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org