You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/15 09:42:20 UTC
svn commit: r1769763 - in
/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server:
consumer/AbstractConsumerTarget.java queue/AbstractQueue.java
queue/QueueConsumerManager.java queue/QueueConsumerManagerImpl.java
Author: lquack
Date: Tue Nov 15 09:42:20 2016
New Revision: 1769763
URL: http://svn.apache.org/viewvc?rev=1769763&view=rev
Log:
fix spurious consumer wakeups
Modified:
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Tue Nov 15 09:42:20 2016
@@ -115,17 +115,12 @@ public abstract class AbstractConsumerTa
}
}
- // TODO - remove once queue is smarter
for (ConsumerImpl consumer : _consumers)
{
consumer.setNotifyWorkDesired(desired);
}
_notifyWorkDesired = desired;
- if (desired)
- {
- notifyWork();
- }
}
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Nov 15 09:42:20 2016
@@ -283,28 +283,29 @@ public abstract class AbstractQueue<X ex
void setNotifyWorkDesired(final QueueConsumer consumer, final boolean desired)
{
- _queueConsumerManager.setInterest(consumer, desired);
-
- if (desired)
- {
- _activeSubscriberCount.incrementAndGet();
- }
- else
+ if (_queueConsumerManager.setInterest(consumer, desired))
{
- _activeSubscriberCount.decrementAndGet();
+ if (desired)
+ {
+ _activeSubscriberCount.incrementAndGet();
+ notifyConsumer(consumer);
+ }
+ else
+ {
+ _activeSubscriberCount.decrementAndGet();
- // iterate over interested and notify one as long as its priority is higher than any notified
- final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
- while(consumerIterator.hasNext())
- {
- QueueConsumer<?> queueConsumer = consumerIterator.next();
- //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
- if(notifyConsumer(queueConsumer))
- {
- break;
+ // iterate over interested and notify one as long as its priority is higher than any notified
+ final Iterator<QueueConsumer<?>> consumerIterator = _queueConsumerManager.getInterestedIterator();
+ while (consumerIterator.hasNext())
+ {
+ QueueConsumer<?> queueConsumer = consumerIterator.next();
+ //TODO - break here if the consumer has lower priority than the highest notified (presuming iterator is priority ordered)
+ if (notifyConsumer(queueConsumer))
+ {
+ break;
+ }
}
}
-
}
}
@@ -976,6 +977,10 @@ public abstract class AbstractQueue<X ex
consumer.setQueueContext(queueContext);
_queueConsumerManager.addConsumer(consumer);
+ if (consumer.isNotifyWorkDesired())
+ {
+ _activeSubscriberCount.incrementAndGet();
+ }
childAdded(consumer);
consumer.addChangeListener(_deletedChildListener);
@@ -1938,16 +1943,19 @@ public abstract class AbstractQueue<X ex
}
}
- void notifyConsumers()
+ void notifyOtherConsumers(final QueueConsumer<?> excludedConsumer)
{
final Iterator<QueueConsumer<?>> interestedIterator = _queueConsumerManager.getInterestedIterator();
while (interestedIterator.hasNext())
{
QueueConsumer<?> consumer = interestedIterator.next();
- if(notifyConsumer(consumer))
+ if (excludedConsumer != consumer)
{
- break;
+ if (notifyConsumer(consumer))
+ {
+ break;
+ }
}
}
}
@@ -1974,6 +1982,15 @@ public abstract class AbstractQueue<X ex
{
queueEmpty = true;
}
+
+ if(messageContainer == null && consumer.acquires())
+ {
+ // TODO - Should be only checking for available messages
+ if(!isEmpty())
+ {
+ notifyOtherConsumers(consumer);
+ }
+ }
}
else
{
@@ -1989,16 +2006,8 @@ public abstract class AbstractQueue<X ex
}
consumer.flushBatched();
-
- }
- if(messageContainer == null && consumer.acquires())
- {
- // TODO - Should be only checking for available messages
- if(!isEmpty())
- {
- notifyConsumers();
- }
}
+
return messageContainer;
}
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Tue Nov 15 09:42:20 2016
@@ -26,7 +26,7 @@ public interface QueueConsumerManager
{
void addConsumer(QueueConsumer<?> consumer);
boolean removeConsumer(QueueConsumer<?> consumer);
- /*public*/ void setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
+ /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
/*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified); // called from Queue
// should be priority and then insertion order
Modified: qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1769763&r1=1769762&r2=1769763&view=diff
==============================================================================
--- qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/branches/remove-queue-runner/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Tue Nov 15 09:42:20 2016
@@ -101,29 +101,29 @@ public class QueueConsumerManagerImpl im
// Set by the consumer always in the IO thread
@Override
- public void setInterest(final QueueConsumer consumer, final boolean interested)
+ public boolean setInterest(final QueueConsumer consumer, final boolean interested)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
if (interested)
{
if (consumer.acquires())
{
- node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.INTERESTED);
+ return node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.INTERESTED);
}
else
{
- node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.NON_ACQUIRING);
+ return node.moveFromTo(NodeState.NOT_INTERESTED, NodeState.NON_ACQUIRING);
}
}
else
{
if (consumer.acquires())
{
- node.moveFromTo(EnumSet.of(NodeState.INTERESTED, NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
+ return node.moveFromTo(EnumSet.of(NodeState.INTERESTED, NodeState.NOTIFIED), NodeState.NOT_INTERESTED);
}
else
{
- node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING), NodeState.NOT_INTERESTED);
+ return node.moveFromTo(EnumSet.of(NodeState.NON_ACQUIRING), NodeState.NOT_INTERESTED);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org