You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2008/07/15 13:42:56 UTC
svn commit: r676887 -
/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Author: ritchiem
Date: Tue Jul 15 04:42:55 2008
New Revision: 676887
URL: http://svn.apache.org/viewvc?rev=676887&view=rev
Log:
QPID-1172 : Moved unregistration out of the sendLock. Potential refactor possible between processQueue and flushSubscription
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=676887&r1=676886&r2=676887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Tue Jul 15 04:42:55 2008
@@ -82,16 +82,12 @@
private volatile Subscription _exclusiveSubscriber;
-
private final QueueEntryList _entries;
-
private final AMQQueueMBean _managedObject;
private final Executor _asyncDelivery;
private final AtomicLong _totalMessagesReceived = new AtomicLong();
-
-
/** max allowed size(KB) of a single message */
@Configured(path = "maximumMessageSize", defaultValue = "0")
public long _maximumMessageSize;
@@ -112,14 +108,10 @@
@Configured(path = "minimumAlertRepeatGap", defaultValue = "0")
public long _minimumAlertRepeatGap;
-
-
private static final int MAX_ASYNC_DELIVERIES = 10;
-
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
-
private final AtomicLong _stateChangeCount = new AtomicLong(Long.MIN_VALUE);
private AtomicReference _asynchronousRunner = new AtomicReference(null);
private AtomicInteger _deliveredMessages = new AtomicInteger();
@@ -127,7 +119,7 @@
protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost)
throws AMQException
{
- this(name,durable,owner,autoDelete,virtualHost,new SimpleQueueEntryList.Factory());
+ this(name, durable, owner, autoDelete, virtualHost, new SimpleQueueEntryList.Factory());
}
protected SimpleAMQQueue(AMQShortString name,
@@ -136,7 +128,7 @@
boolean autoDelete,
VirtualHost virtualHost,
QueueEntryListFactory entryListFactory)
- throws AMQException
+ throws AMQException
{
if (name == null)
@@ -168,7 +160,6 @@
throw new AMQException("AMQQueue MBean creation has failed ", e);
}
-
// This ensure that the notification checks for the configured alerts are created.
setMaximumMessageAge(_maximumMessageAge);
setMaximumMessageCount(_maximumMessageCount);
@@ -204,7 +195,6 @@
return _virtualHost;
}
-
// ------ bind and unbind
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
@@ -227,27 +217,25 @@
}
boolean removed = _bindings.remove(routingKey, arguments, exchange);
- if(!removed)
+ if (!removed)
{
_logger.error("Mismatch between queue bindings and exchange record of bindings");
}
}
-
// ------ Manage Subscriptions
public synchronized void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException
{
-
- if(isExclusiveSubscriber())
+ if (isExclusiveSubscriber())
{
throw new ExistingExclusiveSubscription();
}
- if(exclusive)
+ if (exclusive)
{
- if(getConsumerCount() != 0)
+ if (getConsumerCount() != 0)
{
throw new ExistingSubscriptionPreventsExclusive();
}
@@ -258,16 +246,15 @@
}
}
-
_activeSubscriberCount.incrementAndGet();
subscription.setStateListener(this);
- subscription.setLastSeenEntry(null,_entries.getHead());
+ subscription.setLastSeenEntry(null, _entries.getHead());
- if(!isDeleted())
+ if (!isDeleted())
{
subscription.setQueue(this);
_subscriptionList.add(subscription);
- if(isDeleted())
+ if (isDeleted())
{
subscription.queueDeleted(this);
}
@@ -277,39 +264,32 @@
// TODO
}
-
deliverAsync(subscription);
}
public synchronized void unregisterSubscription(final Subscription subscription) throws AMQException
{
- if(subscription == null)
+ if (subscription == null)
{
throw new NullPointerException("subscription argument is null");
}
boolean removed = _subscriptionList.remove(subscription);
-
-
- if(removed)
+ if (removed)
{
subscription.close();
// No longer can the queue have an exclusive consumer
setExclusiveSubscriber(null);
-
QueueEntry lastSeen;
- while((lastSeen = subscription.getLastSeenEntry()) != null)
+ while ((lastSeen = subscription.getLastSeenEntry()) != null)
{
subscription.setLastSeenEntry(lastSeen, null);
}
-
-
-
// auto-delete queues must be deleted if there are no remaining subscribers
if (_autoDelete && getConsumerCount() == 0)
@@ -324,30 +304,25 @@
// we need to manually fire the event to the removed subscription (which was the last one left for this
// queue. This is because the delete method uses the subscription set which has just been cleared
subscription.queueDeleted(this);
- }
+ }
}
-
}
-
// ------ Enqueue / Dequeue
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
{
-
-
incrementQueueCount();
incrementQueueSize(message);
_totalMessagesReceived.incrementAndGet();
-
- QueueEntry entry;
+ QueueEntry entry;
Subscription exclusiveSub = _exclusiveSubscriber;
- if(exclusiveSub != null)
+ if (exclusiveSub != null)
{
exclusiveSub.getSendLock();
@@ -357,11 +332,10 @@
deliverToSubscription(exclusiveSub, entry);
-
// where there is more than one producer there's a reasonable chance that even though there is
// no "queueing" we do not deliver because we get an interleving of _entries.add and
// deliverToSubscription between threads. Therefore have one more try.
- if(!(entry.isAcquired() || entry.isDeleted()))
+ if (!(entry.isAcquired() || entry.isDeleted()))
{
deliverToSubscription(exclusiveSub, entry);
}
@@ -381,13 +355,13 @@
*/
SubscriptionList.SubscriptionNode node = _lastSubscriptionNode.get();
SubscriptionList.SubscriptionNode nextNode = node.getNext();
- if(nextNode == null)
+ if (nextNode == null)
{
nextNode = _subscriptionList.getHead().getNext();
}
- while(nextNode != null)
+ while (nextNode != null)
{
- if(_lastSubscriptionNode.compareAndSet(node, nextNode))
+ if (_lastSubscriptionNode.compareAndSet(node, nextNode))
{
break;
}
@@ -395,21 +369,20 @@
{
node = _lastSubscriptionNode.get();
nextNode = node.getNext();
- if(nextNode == null)
+ if (nextNode == null)
{
nextNode = _subscriptionList.getHead().getNext();
}
}
}
-
// always do one extra loop after we believe we've finished
// this catches the case where we *just* miss an update
int loops = 2;
- while(!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
+ while (!(entry.isAcquired() || entry.isDeleted()) && loops != 0)
{
- if(nextNode == null)
+ if (nextNode == null)
{
loops--;
nextNode = _subscriptionList.getHead();
@@ -425,20 +398,18 @@
}
}
-
- if(entry.immediateAndNotDelivered())
+ if (entry.immediateAndNotDelivered())
{
dequeue(storeContext, entry);
entry.dispose(storeContext);
}
- else if(!(entry.isAcquired() || entry.isDeleted()))
+ else if (!(entry.isAcquired() || entry.isDeleted()))
{
checkSubscriptionsNotAheadOfDelivery(entry);
deliverAsync();
}
-
try
{
_managedObject.checkForNotification(entry.getMessage());
@@ -448,7 +419,6 @@
throw new AMQException("Unable to get notification from manage queue: " + e, e);
}
-
return entry;
}
@@ -460,12 +430,12 @@
sub.getSendLock();
try
{
- if(subscriptionReadyAndHasInterest(sub, entry)
- && !sub.isSuspended())
+ if (subscriptionReadyAndHasInterest(sub, entry)
+ && !sub.isSuspended())
{
- if( !sub.wouldSuspend(entry))
+ if (!sub.wouldSuspend(entry))
{
- if(!sub.isBrowser() && !entry.acquire(sub))
+ if (!sub.isBrowser() && !entry.acquire(sub))
{
// restore credit here that would have been taken away by wouldSuspend since we didn't manage
// to acquire the entry for this subscription
@@ -516,11 +486,11 @@
// We need to move this subscription on, past entries which are already acquired, or deleted or ones it has no
// interest in.
QueueEntry node = sub.getLastSeenEntry();
- while(node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)) )
+ while (node != null && (node.isAcquired() || node.isDeleted() || !sub.hasInterest(node)))
{
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
@@ -533,8 +503,7 @@
}
-
- if(node == entry)
+ if (node == entry)
{
// If the first entry that subscription can process is the one we are trying to deliver to it, then we are
// good
@@ -555,11 +524,11 @@
{
QueueEntry node = sub.getLastSeenEntry();
- if(node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
+ if (node != null && entry.compareTo(node) < 0 && sub.hasInterest(entry))
{
do
{
- if(sub.setLastSeenEntry(node,entry))
+ if (sub.setLastSeenEntry(node, entry))
{
return;
}
@@ -567,7 +536,8 @@
{
node = sub.getLastSeenEntry();
}
- } while (node != null && entry.compareTo(node) < 0);
+ }
+ while (node != null && entry.compareTo(node) < 0);
}
}
@@ -577,28 +547,26 @@
SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
// iterate over all the subscribers, and if they are in advance of this queue entry then move them backwards
- while(subscriberIter.advance())
+ while (subscriberIter.advance())
{
Subscription sub = subscriberIter.getNode().getSubscription();
// we don't make browsers send the same stuff twice
- if(!sub.isBrowser())
+ if (!sub.isBrowser())
{
updateLastSeenEntry(sub, entry);
}
}
-
deliverAsync();
-
}
public void dequeue(StoreContext storeContext, QueueEntry entry) throws FailedDequeueException
{
decrementQueueCount();
decrementQueueSize(entry);
- if(entry.acquiredBySubscription())
+ if (entry.acquiredBySubscription())
{
_deliveredMessages.decrementAndGet();
}
@@ -606,7 +574,7 @@
try
{
AMQMessage msg = entry.getMessage();
- if(isDurable() && msg.isPersistent())
+ if (isDurable() && msg.isPersistent())
{
_virtualHost.getMessageStore().dequeueMessage(storeContext, this, msg.getMessageId());
}
@@ -626,7 +594,6 @@
throw new FailedDequeueException(_name.toString(), e);
}
-
}
private void decrementQueueSize(final QueueEntry entry)
@@ -647,7 +614,7 @@
subscription.getSendLock();
try
{
- if(!subscription.isClosed())
+ if (!subscription.isClosed())
{
deliverMessage(subscription, entry);
return true;
@@ -663,10 +630,6 @@
}
}
-
-
-
-
public int getConsumerCount()
{
return _subscriptionList.size();
@@ -700,7 +663,7 @@
public int getUndeliveredMessageCount()
{
int count = getMessageCount() - _deliveredMessages.get();
- if(count < 0)
+ if (count < 0)
{
return 0;
}
@@ -710,7 +673,6 @@
}
}
-
public long getReceivedMessageCount()
{
return _totalMessagesReceived.get();
@@ -732,16 +694,14 @@
return _deleted.get();
}
-
-
public List<QueueEntry> getMessagesOnTheQueue()
{
ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
QueueEntryIterator queueListIterator = _entries.iterator();
- while(queueListIterator.advance())
+ while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if(node != null && !node.isDeleted())
+ if (node != null && !node.isDeleted())
{
entryList.add(node);
}
@@ -752,14 +712,14 @@
public void stateChange(Subscription sub, Subscription.State oldState, Subscription.State newState)
{
- if(oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
+ if (oldState == Subscription.State.ACTIVE && newState != Subscription.State.ACTIVE)
{
_activeSubscriberCount.decrementAndGet();
}
- else if(newState == Subscription.State.ACTIVE)
- {
- if(oldState != Subscription.State.ACTIVE)
+ else if (newState == Subscription.State.ACTIVE)
+ {
+ if (oldState != Subscription.State.ACTIVE)
{
_activeSubscriberCount.incrementAndGet();
@@ -800,55 +760,52 @@
public boolean filterComplete();
}
-
-
public List<QueueEntry> getMessagesOnTheQueue(final long fromMessageId, final long toMessageId)
{
return getMessagesOnTheQueue(new QueueEntryFilter()
- {
+ {
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- return messageId >= fromMessageId && messageId <= toMessageId;
- }
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ return messageId >= fromMessageId && messageId <= toMessageId;
+ }
- public boolean filterComplete()
- {
- return false;
- }
- });
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
}
public QueueEntry getMessageOnTheQueue(final long messageId)
{
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
- private boolean _complete;
+ {
+ private boolean _complete;
- public boolean accept(QueueEntry entry)
- {
- _complete = entry.getMessage().getMessageId() == messageId;
- return _complete;
- }
+ public boolean accept(QueueEntry entry)
+ {
+ _complete = entry.getMessage().getMessageId() == messageId;
+ return _complete;
+ }
- public boolean filterComplete()
- {
- return _complete;
- }
- });
+ public boolean filterComplete()
+ {
+ return _complete;
+ }
+ });
return entries.isEmpty() ? null : entries.get(0);
}
-
public List<QueueEntry> getMessagesOnTheQueue(QueueEntryFilter filter)
{
ArrayList<QueueEntry> entryList = new ArrayList<QueueEntry>();
QueueEntryIterator queueListIterator = _entries.iterator();
- while(queueListIterator.advance() && !filter.filterComplete())
+ while (queueListIterator.advance() && !filter.filterComplete())
{
QueueEntry node = queueListIterator.getNode();
- if(!node.isDeleted() && filter.accept(node))
+ if (!node.isDeleted() && filter.accept(node))
{
entryList.add(node);
}
@@ -857,7 +814,6 @@
}
-
public void moveMessagesToAnotherQueue(final long fromMessageId,
final long toMessageId,
String queueName,
@@ -867,24 +823,22 @@
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
MessageStore store = getVirtualHost().getMessageStore();
-
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
-
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- return (messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && entry.acquire();
- }
+ {
- public boolean filterComplete()
- {
- return false;
- }
- });
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ return (messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && entry.acquire();
+ }
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
try
{
@@ -895,7 +849,7 @@
{
AMQMessage message = entry.getMessage();
- if(message.isPersistent() && toQueue.isDurable())
+ if (message.isPersistent() && toQueue.isDurable())
{
store.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
@@ -943,7 +897,6 @@
throw new RuntimeException(e);
}
-
}
public void copyMessagesToAnotherQueue(final long fromMessageId,
@@ -954,30 +907,29 @@
AMQQueue toQueue = getVirtualHost().getQueueRegistry().getQueue(new AMQShortString(queueName));
MessageStore store = getVirtualHost().getMessageStore();
-
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
- {
+ {
- public boolean accept(QueueEntry entry)
- {
- final long messageId = entry.getMessage().getMessageId();
- if((messageId >= fromMessageId)
- && (messageId <= toMessageId))
- {
- if(!entry.isDeleted())
- {
- return entry.getMessage().incrementReference();
- }
- }
+ public boolean accept(QueueEntry entry)
+ {
+ final long messageId = entry.getMessage().getMessageId();
+ if ((messageId >= fromMessageId)
+ && (messageId <= toMessageId))
+ {
+ if (!entry.isDeleted())
+ {
+ return entry.getMessage().incrementReference();
+ }
+ }
- return false;
- }
+ return false;
+ }
- public boolean filterComplete()
- {
- return false;
- }
- });
+ public boolean filterComplete()
+ {
+ return false;
+ }
+ });
try
{
@@ -988,7 +940,7 @@
{
AMQMessage message = entry.getMessage();
- if(message.isReferenced() && message.isPersistent() && toQueue.isDurable())
+ if (message.isReferenced() && message.isPersistent() && toQueue.isDurable())
{
store.enqueueMessage(storeContext, toQueue, message.getMessageId());
}
@@ -1021,7 +973,7 @@
{
for (QueueEntry entry : entries)
{
- if(entry.getMessage().isReferenced())
+ if (entry.getMessage().isReferenced())
{
toQueue.enqueue(storeContext, entry.getMessage());
}
@@ -1036,7 +988,6 @@
throw new RuntimeException(e);
}
-
}
public void removeMessagesFromQueue(long fromMessageId, long toMessageId, StoreContext storeContext)
@@ -1046,17 +997,16 @@
{
QueueEntryIterator queueListIterator = _entries.iterator();
-
- while(queueListIterator.advance())
+ while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
final long messageId = node.getMessage().getMessageId();
- if((messageId >= fromMessageId)
- && (messageId <= toMessageId)
- && !node.isDeleted()
- && node.acquire())
+ if ((messageId >= fromMessageId)
+ && (messageId <= toMessageId)
+ && !node.isDeleted()
+ && node.acquire())
{
node.discard(storeContext);
}
@@ -1072,16 +1022,15 @@
// ------ Management functions
-
public void deleteMessageFromTop(StoreContext storeContext) throws AMQException
{
QueueEntryIterator queueListIterator = _entries.iterator();
boolean noDeletes = true;
- while(noDeletes && queueListIterator.advance() )
+ while (noDeletes && queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if(!node.isDeleted() && node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
node.discard(storeContext);
noDeletes = false;
@@ -1096,10 +1045,10 @@
QueueEntryIterator queueListIterator = _entries.iterator();
long count = 0;
- while(queueListIterator.advance())
+ while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if(!node.isDeleted() && node.acquire())
+ if (!node.isDeleted() && node.acquire())
{
node.discard(storeContext);
count++;
@@ -1110,7 +1059,6 @@
}
-
public void addQueueDeleteTask(final Task task)
{
_deleteTaskList.add(task);
@@ -1126,7 +1074,7 @@
while (subscriptionIter.advance())
{
Subscription s = subscriptionIter.getNode().getSubscription();
- if(s != null)
+ if (s != null)
{
s.queueDeleted(this);
}
@@ -1135,7 +1083,6 @@
_bindings.deregister();
_virtualHost.getQueueRegistry().unregisterQueue(_name);
-
_managedObject.unregister();
for (Task task : _deleteTaskList)
{
@@ -1149,15 +1096,14 @@
}
-
public void deliverAsync()
{
_stateChangeCount.incrementAndGet();
Runner runner = new Runner();
- if(_asynchronousRunner.compareAndSet(null,runner))
- {
+ if (_asynchronousRunner.compareAndSet(null, runner))
+ {
_asyncDelivery.execute(runner);
}
}
@@ -1193,12 +1139,10 @@
}
}
-
private class SubFlushRunner implements ReadWriteRunnable
{
private final Subscription _sub;
-
public SubFlushRunner(Subscription sub)
{
_sub = sub;
@@ -1216,7 +1160,7 @@
{
_logger.error(e);
}
- if(!complete && !_sub.isSuspended())
+ if (!complete && !_sub.isSuspended())
{
_asyncDelivery.execute(this);
}
@@ -1244,25 +1188,25 @@
boolean atTail = false;
boolean advanced;
- while(!sub.isSuspended() && !atTail && deliveries != 0)
+ while (!sub.isSuspended() && !atTail && deliveries != 0)
{
advanced = false;
sub.getSendLock();
try
{
- if(sub.isActive())
+ if (sub.isActive())
{
QueueEntry node = moveSubscriptionToNextNode(sub);
- if(!(node.isAcquired() || node.isDeleted()))
+ if (!(node.isAcquired() || node.isDeleted()))
{
- if(!sub.isSuspended())
+ if (!sub.isSuspended())
{
- if(sub.hasInterest(node))
+ if (sub.hasInterest(node))
{
- if(!sub.wouldSuspend(node))
+ if (!sub.wouldSuspend(node))
{
- if(!sub.isBrowser() && !node.acquire(sub))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
sub.restoreCredit(node);
@@ -1272,11 +1216,11 @@
deliveries--;
deliverMessage(sub, node);
- if(sub.isBrowser())
+ if (sub.isBrowser())
{
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
advanced = true;
sub.setLastSeenEntry(node, newNode);
@@ -1295,7 +1239,7 @@
{
// this subscription is not interested in this node so we can skip over it
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
sub.setLastSeenEntry(node, newNode);
}
@@ -1318,12 +1262,12 @@
// next entry they are interested in yet. This would lead to holding on to references to expired messages, etc
// which would give us memory "leak".
- if(!isExclusiveSubscriber())
+ if (!isExclusiveSubscriber())
{
advanceAllSubscriptions();
}
- if(atTail && sub.isAutoClose())
+ if (atTail && sub.isAutoClose())
{
unregisterSubscription(sub);
@@ -1337,7 +1281,7 @@
protected void advanceAllSubscriptions() throws AMQException
{
SubscriptionList.SubscriptionNodeIterator subscriberIter = _subscriptionList.iterator();
- while(subscriberIter.advance())
+ while (subscriberIter.advance())
{
SubscriptionList.SubscriptionNode subNode = subscriberIter.getNode();
Subscription sub = subNode.getSubscription();
@@ -1349,19 +1293,19 @@
throws AMQException
{
QueueEntry node = sub.getLastSeenEntry();
-
- while(node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
+
+ while (node != null && (node.isAcquired() || node.isDeleted() || node.expired()))
{
- if(!node.isAcquired() && !node.isDeleted() && node.expired())
+ if (!node.isAcquired() && !node.isDeleted() && node.expired())
{
- if(node.acquire())
+ if (node.acquire())
{
final StoreContext reapingStoreContext = new StoreContext();
node.discard(reapingStoreContext);
}
}
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
@@ -1375,7 +1319,6 @@
return node;
}
-
private void processQueue(Runnable runner) throws AMQException
{
long stateChangeCount;
@@ -1385,51 +1328,51 @@
int extraLoops = 1;
int deliveries = MAX_ASYNC_DELIVERIES;
- _asynchronousRunner.compareAndSet(runner,null);
+ _asynchronousRunner.compareAndSet(runner, null);
- while(deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete ) && _asynchronousRunner.compareAndSet(null,runner))
+ while (deliveries != 0 && ((previousStateChangeCount != (stateChangeCount = _stateChangeCount.get())) || deliveryIncomplete) && _asynchronousRunner.compareAndSet(null, runner))
{
// we want to have one extra loop after every subscription has reached the point where it cannot move
// further, just in case the advance of one subscription in the last loop allows a different subscription to
// move forward in the next iteration
- if(previousStateChangeCount != stateChangeCount)
+ if (previousStateChangeCount != stateChangeCount)
{
extraLoops = 1;
}
-
+
previousStateChangeCount = stateChangeCount;
deliveryIncomplete = _subscriptionList.size() != 0;
boolean done = true;
-
SubscriptionList.SubscriptionNodeIterator subscriptionIter = _subscriptionList.iterator();
//iterate over the subscribers and try to advance their pointer
- while(subscriptionIter.advance())
+ while (subscriptionIter.advance())
{
+ boolean closeConsumer = false;
Subscription sub = subscriptionIter.getNode().getSubscription();
- if(sub != null)
+ if (sub != null)
{
sub.getSendLock();
try
{
QueueEntry node = moveSubscriptionToNextNode(sub);
- if(node != null && sub.isActive())
+ if (node != null && sub.isActive())
{
boolean advanced = false;
boolean subActive = false;
- if(!(node.isAcquired() || node.isDeleted()))
+ if (!(node.isAcquired() || node.isDeleted()))
{
- if(!sub.isSuspended())
+ if (!sub.isSuspended())
{
subActive = true;
- if(sub.hasInterest(node))
+ if (sub.hasInterest(node))
{
- if(!sub.wouldSuspend(node))
+ if (!sub.wouldSuspend(node))
{
- if(!sub.isBrowser() && !node.acquire(sub))
+ if (!sub.isBrowser() && !node.acquire(sub))
{
sub.restoreCredit(node);
@@ -1439,32 +1382,31 @@
deliverMessage(sub, node);
deliveries--;
- if(sub.isBrowser())
+ if (sub.isBrowser())
{
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
sub.setLastSeenEntry(node, newNode);
node = sub.getLastSeenEntry();
advanced = true;
}
-
}
}
done = false;
}
else
{
- node.addStateChangeListener(new QueueEntryListener(sub,node));
+ node.addStateChangeListener(new QueueEntryListener(sub, node));
}
}
else
{
// this subscription is not interested in this node so we can skip over it
QueueEntry newNode = _entries.next(node);
- if(newNode != null)
+ if (newNode != null)
{
sub.setLastSeenEntry(node, newNode);
}
@@ -1475,25 +1417,26 @@
done = done && (!subActive || atTail);
- if(atTail && !advanced && sub.isAutoClose())
- {
- unregisterSubscription(sub);
-
- ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
- converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
-
- }
-
+ closeConsumer = (atTail && !advanced && sub.isAutoClose());
}
}
finally
{
sub.releaseSendLock();
}
+
+ if (closeConsumer)
+ {
+ unregisterSubscription(sub);
+
+ ProtocolOutputConverter converter = sub.getChannel().getProtocolSession().getProtocolOutputConverter();
+ converter.confirmConsumerAutoClose(sub.getChannel().getChannelId(), sub.getConsumerTag());
+ }
+
}
- if(done)
+ if (done)
{
- if(extraLoops == 0)
+ if (extraLoops == 0)
{
deliveryIncomplete = false;
}
@@ -1508,20 +1451,17 @@
}
}
-
-
_asynchronousRunner.set(null);
}
// If deliveries == 0 then the limitting factor was the time-slicing rather than available messages or credit
// therefore we should schedule this runner again (unless someone beats us to it :-) ).
- if(deliveries == 0 && _asynchronousRunner.compareAndSet(null,runner))
+ if (deliveries == 0 && _asynchronousRunner.compareAndSet(null, runner))
{
_asyncDelivery.execute(runner);
}
}
-
public void removeExpiredIfNoSubscribers() throws AMQException
{
@@ -1529,10 +1469,10 @@
QueueEntryIterator queueListIterator = _entries.iterator();
- while(queueListIterator.advance())
+ while (queueListIterator.advance())
{
QueueEntry node = queueListIterator.getNode();
- if(!node.isDeleted() && node.expired() && node.acquire())
+ if (!node.isDeleted() && node.expired() && node.acquire())
{
node.discard(storeContext);
@@ -1542,7 +1482,6 @@
}
-
public long getMinimumAlertRepeatGap()
{
return _minimumAlertRepeatGap;
@@ -1561,7 +1500,7 @@
public void setMaximumMessageAge(long maximumMessageAge)
{
_maximumMessageAge = maximumMessageAge;
- if(maximumMessageAge == 0L)
+ if (maximumMessageAge == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT);
}
@@ -1579,7 +1518,7 @@
public void setMaximumMessageCount(final long maximumMessageCount)
{
_maximumMessageCount = maximumMessageCount;
- if(maximumMessageCount == 0L)
+ if (maximumMessageCount == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT);
}
@@ -1588,8 +1527,6 @@
_notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT);
}
-
-
}
public long getMaximumQueueDepth()
@@ -1601,7 +1538,7 @@
public void setMaximumQueueDepth(final long maximumQueueDepth)
{
_maximumQueueDepth = maximumQueueDepth;
- if(maximumQueueDepth == 0L)
+ if (maximumQueueDepth == 0L)
{
_notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT);
}
@@ -1620,7 +1557,7 @@
public void setMaximumMessageSize(final long maximumMessageSize)
{
_maximumMessageSize = maximumMessageSize;
- if(maximumMessageSize == 0L)
+ if (maximumMessageSize == 0L)
{
_notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT);
}
@@ -1630,7 +1567,6 @@
}
}
-
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -1654,7 +1590,7 @@
public boolean equals(Object o)
{
- return _entry == ((QueueEntryListener)o)._entry && _sub == ((QueueEntryListener)o)._sub;
+ return _entry == ((QueueEntryListener) o)._entry && _sub == ((QueueEntryListener) o)._sub;
}
public int hashCode()