You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2016/11/16 21:36:27 UTC
svn commit: r1770060 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue:
AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java
QueueConsumerNode.java
Author: rgodfrey
Date: Wed Nov 16 21:36:27 2016
New Revision: 1770060
URL: http://svn.apache.org/viewvc?rev=1770060&view=rev
Log:
QPID-7514 : reduce the occurrences of consumers being removed and then immediately being reinserted into a list
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 16 21:36:27 2016
@@ -1880,19 +1880,20 @@ public abstract class AbstractQueue<X ex
boolean queueEmpty = false;
MessageContainer messageContainer = null;
- _queueConsumerManager.setNotified(consumer, false);
+ _queueConsumerManager.clearStateAffirmationFlag(consumer);
try
{
+
if (!consumer.isSuspended())
{
messageContainer = attemptDelivery(consumer);
- if(messageContainer.getMessageInstance() != null)
- {
- _queueConsumerManager.setNotified(consumer, true);
- }
- else
+
+
+ if(messageContainer.getMessageInstance() == null)
{
+ _queueConsumerManager.setNotified(consumer, false, true);
+
if (messageContainer.hasNoAvailableMessages())
{
queueEmpty = true;
@@ -2243,7 +2244,7 @@ public abstract class AbstractQueue<X ex
private boolean notifyConsumer(final QueueConsumer<?> consumer)
{
- if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
+ if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true, false))
{
consumer.notifyWork();
return true;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Wed Nov 16 21:36:27 2016
@@ -27,7 +27,7 @@ public interface QueueConsumerManager
void addConsumer(QueueConsumer<?> consumer);
boolean removeConsumer(QueueConsumer<?> consumer);
/*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
- /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified); // called from Queue
+ /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified, final boolean conditional); // called from Queue
// should be priority and then insertion order
Iterator<QueueConsumer<?>> getInterestedIterator();
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Wed Nov 16 21:36:27 2016
@@ -81,16 +81,16 @@ public class QueueConsumerManagerImpl im
{
if (consumer.acquires())
{
- node.moveFromTo(REMOVED, NodeState.INTERESTED);
+ node.moveFromTo(REMOVED, NodeState.INTERESTED, false);
}
else
{
- node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING);
+ node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING, false);
}
}
else
{
- node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED);
+ node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED, false);
}
_count++;
}
@@ -102,7 +102,7 @@ public class QueueConsumerManagerImpl im
removeFromAll(consumer);
QueueConsumerNode node = consumer.getQueueConsumerNode();
- if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED))
+ if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED, false))
{
_count--;
return true;
@@ -119,40 +119,46 @@ public class QueueConsumerManagerImpl im
{
if (consumer.acquires())
{
- return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED);
+ return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED, false);
}
else
{
- return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING);
+ return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING, false);
}
}
else
{
if (consumer.acquires())
{
- return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED);
+ return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED, false);
}
else
{
- return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED);
+ return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED, false);
}
}
}
+ public void clearStateAffirmationFlag(final QueueConsumer consumer)
+ {
+ QueueConsumerNode node = consumer.getQueueConsumerNode();
+ node.clearAffirmaion();
+ }
+
// Set by the Queue any IO thread
@Override
- public boolean setNotified(final QueueConsumer consumer, final boolean notified)
+ public boolean setNotified(final QueueConsumer consumer, final boolean notified, final boolean conditional)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
if (consumer.acquires())
{
if (notified)
{
- return node.moveFromTo(INTERESTED, NodeState.NOTIFIED);
+ return node.moveFromTo(INTERESTED, NodeState.NOTIFIED, conditional);
}
else
{
- return node.moveFromTo(NOTIFIED, NodeState.INTERESTED);
+ return node.moveFromTo(NOTIFIED, NodeState.INTERESTED, conditional);
}
}
else
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770060&r1=1770059&r2=1770060&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Wed Nov 16 21:36:27 2016
@@ -29,6 +29,7 @@ final class QueueConsumerNode
private QueueConsumerNodeListEntry _listEntry;
private QueueConsumerManagerImpl.NodeState _state = QueueConsumerManagerImpl.NodeState.REMOVED;
private QueueConsumerNodeListEntry _allEntry;
+ private boolean _affirmed;
QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?> queueConsumer)
{
@@ -46,20 +47,37 @@ final class QueueConsumerNode
return _state;
}
- public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates, QueueConsumerManagerImpl.NodeState toState)
+ public synchronized void clearAffirmaion()
+ {
+ _affirmed = false;
+ }
+
+
+ public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
+ QueueConsumerManagerImpl.NodeState toState,
+ final boolean conditional)
{
if (fromStates.contains(_state))
{
- if (_listEntry != null)
+ if(!conditional || !_affirmed)
+ {
+ if (_listEntry != null)
+ {
+ _listEntry.remove();
+ }
+ _state = toState;
+ _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+ return true;
+ }
+ else
{
- _listEntry.remove();
+ _affirmed = false;
+ return false;
}
- _state = toState;
- _listEntry = _queueConsumerManager.addNodeToInterestList(this);
- return true;
}
else
{
+ _affirmed = _state == toState;
return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org