You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/11/23 14:27:39 UTC
svn commit: r1770975 - in
/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue:
AbstractQueue.java QueueConsumerManager.java QueueConsumerManagerImpl.java
QueueConsumerNode.java
Author: lquack
Date: Wed Nov 23 14:27:38 2016
New Revision: 1770975
URL: http://svn.apache.org/viewvc?rev=1770975&view=rev
Log:
QPID-7514: [Java Broker] remove affirmation logic from QueueConsumerManager
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Wed Nov 23 14:27:38 2016
@@ -1861,8 +1861,7 @@ public abstract class AbstractQueue<X ex
boolean queueEmpty = false;
MessageContainer messageContainer = null;
- _queueConsumerManager.clearStateAffirmationFlag(consumer);
-
+ _queueConsumerManager.setNotified(consumer, false);
try
{
@@ -1870,11 +1869,8 @@ public abstract class AbstractQueue<X ex
{
messageContainer = attemptDelivery(consumer);
-
if(messageContainer.getMessageInstance() == null)
{
- _queueConsumerManager.setNotified(consumer, false, true);
-
if (messageContainer.hasNoAvailableMessages())
{
queueEmpty = true;
@@ -1889,6 +1885,10 @@ public abstract class AbstractQueue<X ex
}
messageContainer = null;
}
+ else
+ {
+ _queueConsumerManager.setNotified(consumer, true);
+ }
}
else
{
@@ -2225,7 +2225,7 @@ public abstract class AbstractQueue<X ex
private boolean notifyConsumer(final QueueConsumer<?> consumer)
{
- if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true, false))
+ if(consumerHasAvailableMessages(consumer) && _queueConsumerManager.setNotified(consumer, true))
{
consumer.notifyWork();
return true;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManager.java Wed Nov 23 14:27:38 2016
@@ -26,19 +26,13 @@ public interface QueueConsumerManager
{
void addConsumer(QueueConsumer<?> consumer);
boolean removeConsumer(QueueConsumer<?> consumer);
- /*public*/ boolean setInterest(QueueConsumer<?> consumer, boolean interested); // called from Consumer
- /*private*/ boolean setNotified(QueueConsumer<?> consumer, boolean notified, final boolean conditional); // called from Queue
+ boolean setInterest(QueueConsumer<?> consumer, boolean interested);
+ boolean setNotified(QueueConsumer<?> consumer, boolean notified);
- // should be priority and then insertion order
Iterator<QueueConsumer<?>> getInterestedIterator();
-
Iterator<QueueConsumer<?>> getAllIterator();
Iterator<QueueConsumer<?>> getNonAcquiringIterator();
int getAllSize();
- // int getInterestedSize();
- int getNotifiedAcquiringSize();
-
-
int getHighestNotifiedPriority();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerManagerImpl.java Wed Nov 23 14:27:38 2016
@@ -81,16 +81,16 @@ public class QueueConsumerManagerImpl im
{
if (consumer.acquires())
{
- node.moveFromTo(REMOVED, NodeState.INTERESTED, false);
+ node.moveFromTo(REMOVED, NodeState.INTERESTED);
}
else
{
- node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING, false);
+ node.moveFromTo(REMOVED, NodeState.NON_ACQUIRING);
}
}
else
{
- node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED, false);
+ node.moveFromTo(REMOVED, NodeState.NOT_INTERESTED);
}
_count++;
}
@@ -102,7 +102,7 @@ public class QueueConsumerManagerImpl im
removeFromAll(consumer);
QueueConsumerNode node = consumer.getQueueConsumerNode();
- if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED, false))
+ if (node.moveFromTo(STATES_OTHER_THAN_REMOVED, NodeState.REMOVED))
{
_count--;
return true;
@@ -119,46 +119,40 @@ public class QueueConsumerManagerImpl im
{
if (consumer.acquires())
{
- return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED, false);
+ return node.moveFromTo(NOT_INTERESTED, NodeState.INTERESTED);
}
else
{
- return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING, false);
+ return node.moveFromTo(NOT_INTERESTED, NodeState.NON_ACQUIRING);
}
}
else
{
if (consumer.acquires())
{
- return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED, false);
+ return node.moveFromTo(EITHER_INTERESTED_OR_NOTIFIED, NodeState.NOT_INTERESTED);
}
else
{
- return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED, false);
+ return node.moveFromTo(NON_ACQUIRING, NodeState.NOT_INTERESTED);
}
}
}
- public void clearStateAffirmationFlag(final QueueConsumer consumer)
- {
- QueueConsumerNode node = consumer.getQueueConsumerNode();
- node.clearAffirmation();
- }
-
// Set by the Queue any IO thread
@Override
- public boolean setNotified(final QueueConsumer consumer, final boolean notified, final boolean conditional)
+ public boolean setNotified(final QueueConsumer consumer, final boolean notified)
{
QueueConsumerNode node = consumer.getQueueConsumerNode();
if (consumer.acquires())
{
if (notified)
{
- return node.moveFromTo(INTERESTED, NodeState.NOTIFIED, conditional);
+ return node.moveFromTo(INTERESTED, NodeState.NOTIFIED);
}
else
{
- return node.moveFromTo(NOTIFIED, NodeState.INTERESTED, conditional);
+ return node.moveFromTo(NOTIFIED, NodeState.INTERESTED);
}
}
else
@@ -192,12 +186,6 @@ public class QueueConsumerManagerImpl im
}
@Override
- public int getNotifiedAcquiringSize()
- {
- return _notified.size();
- }
-
- @Override
public int getHighestNotifiedPriority()
{
final Iterator<QueueConsumerNode> notifiedIterator =
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java?rev=1770975&r1=1770974&r2=1770975&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerNode.java Wed Nov 23 14:27:38 2016
@@ -29,7 +29,6 @@ final class QueueConsumerNode
private QueueConsumerNodeListEntry _listEntry;
private QueueConsumerManagerImpl.NodeState _state = QueueConsumerManagerImpl.NodeState.REMOVED;
private QueueConsumerNodeListEntry _allEntry;
- private boolean _affirmed;
QueueConsumerNode(final QueueConsumerManagerImpl queueConsumerManager, final QueueConsumer<?> queueConsumer)
{
@@ -47,37 +46,21 @@ final class QueueConsumerNode
return _state;
}
- public synchronized void clearAffirmation()
- {
- _affirmed = false;
- }
-
-
public synchronized boolean moveFromTo(Collection<QueueConsumerManagerImpl.NodeState> fromStates,
- QueueConsumerManagerImpl.NodeState toState,
- final boolean conditional)
+ QueueConsumerManagerImpl.NodeState toState)
{
if (fromStates.contains(_state))
{
- if(!conditional || !_affirmed)
- {
- if (_listEntry != null)
- {
- _listEntry.remove();
- }
- _state = toState;
- _listEntry = _queueConsumerManager.addNodeToInterestList(this);
- return true;
- }
- else
+ if (_listEntry != null)
{
- _affirmed = false;
- return false;
+ _listEntry.remove();
}
+ _state = toState;
+ _listEntry = _queueConsumerManager.addNodeToInterestList(this);
+ return true;
}
else
{
- _affirmed = _state == toState;
return false;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org