You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Gary Tully (JIRA)" <ji...@apache.org> on 2010/09/10 11:10:40 UTC
[jira] Commented: (AMQ-2908) Slow consumer stops receiving messages
because PrefetchSubscription.dispatched is filled with expired messages.
[ https://issues.apache.org/activemq/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=61762#action_61762 ]
Gary Tully commented on AMQ-2908:
---------------------------------
do you have a test case for this?
> Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-2908
> URL: https://issues.apache.org/activemq/browse/AMQ-2908
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 5.3.2
> Reporter: Siim Kaalep
>
> Slow consumer gets stuck when consuming from queue that has expiring messages in it.
> Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will remove all expired message from dispatch.
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (working copy)
> @@ -400,6 +400,21 @@
> }
> }
>
> + public void removeExpiredMessagesFromDispatch() {
> + synchronized(dispatchLock) {
> + for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); ) {
> + final MessageReference node = iter.next();
> + if (node.isExpired()) {
> + if (broker.isExpired(node)) {
> + node.getRegionDestination().messageExpired(context, this, node);
> + }
> + dispatched.remove(node);
> + node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> + }
> + }
> + }
> + }
> +
> /**
> * Checks an ack versus the contents of the dispatched list.
> *
> {code}
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (working copy)
> @@ -1543,6 +1543,9 @@
> }
> if (dispatchSelector.canSelect(s, node)) {
> if (!fullConsumers.contains(s)) {
> + if (s.isFull() && s instanceof PrefetchSubscription) {
> + ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> + }
> if (!s.isFull()) {
> // Dispatch it.
> s.add(node);
> {code}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.