You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2011/07/29 15:31:16 UTC

[jira] [Resolved] (AMQ-2908) Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.

     [ https://issues.apache.org/jira/browse/AMQ-2908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish resolved AMQ-2908.
-------------------------------

       Resolution: Fixed
    Fix Version/s: 5.6.0

The addition of the inactive destination cleanup and the auto sweep for expired messages should resolve this issue.

> Slow consumer stops receiving messages because PrefetchSubscription.dispatched is filled with expired messages.
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-2908
>                 URL: https://issues.apache.org/jira/browse/AMQ-2908
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.3.2
>            Reporter: Siim Kaalep
>            Assignee: Gary Tully
>             Fix For: 5.6.0
>
>
> Slow consumer gets stuck when consuming from queue that has expiring messages in it. 
> Looked into broker while it got stuck and saw that PrefetchSubscription.dispatched is full of expired messages.
> WORKAROUND
> Into doActualDispatch added check that if subscription is full, it will remove all expired message from dispatch.
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java               (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java            (working copy)
> @@ -400,6 +400,21 @@
>          }
>      }
>  
> +   public void removeExpiredMessagesFromDispatch() {
> +     synchronized(dispatchLock) {
> +                  for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); ) {
> +                    final MessageReference node = iter.next();
> +                    if (node.isExpired()) {
> +                        if (broker.isExpired(node)) {
> +                            node.getRegionDestination().messageExpired(context, this, node);
> +                        }
> +                        dispatched.remove(node);
> +                        node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
> +                    }
> +                  }
> +     }
> +   }
> +    
>      /**
>       * Checks an ack versus the contents of the dispatched list.
>       * 
> {code}
> {code}
> Index: trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
> ===================================================================
> --- trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java           (revision 42304)
> +++ trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java        (working copy)
> @@ -1543,6 +1543,9 @@
>                  }
>                  if (dispatchSelector.canSelect(s, node)) {
>                      if (!fullConsumers.contains(s)) {
> +                               if (s.isFull() && s instanceof PrefetchSubscription) {
> +                                             ((PrefetchSubscription)s).removeExpiredMessagesFromDispatch();
> +                               }
>                          if (!s.isFull()) {
>                              // Dispatch it.
>                              s.add(node);
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira