You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2012/12/11 20:19:21 UTC

[jira] [Commented] (AMQCPP-445) Prefetch_size = 0 and failover: consumer blocked in receive() when broker restarts

    [ https://issues.apache.org/jira/browse/AMQCPP-445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13529232#comment-13529232 ] 

Timothy Bish commented on AMQCPP-445:
-------------------------------------

I believe that this as well as other scenarios like it are all fixed in the v3.5 code on trunk.  You can download the latest SNAPSHOT src bundle here and try it out.  
http://people.apache.org/~tabish/cms-3.5.x/

This isn't fixable on v3.4.x baseline code as it requires changes that break ABI 
                
> Prefetch_size = 0 and failover: consumer blocked in receive() when broker restarts
> ----------------------------------------------------------------------------------
>
>                 Key: AMQCPP-445
>                 URL: https://issues.apache.org/jira/browse/AMQCPP-445
>             Project: ActiveMQ C++ Client
>          Issue Type: Bug
>          Components: CMS Impl
>    Affects Versions: 3.4.4
>         Environment: Ubuntu 10.04/12.04 x86_64, ActiveMQ 5.5.0/5.7.0
>            Reporter: Thomas Riccardi
>            Assignee: Timothy Bish
>              Labels: blocked, consumer, dequeue, failover, prefetch_size, receive
>
> With prefetch_size=0, the consumer.receive() sends a pull request to the broker, and then waits on dequeue() for a new message.
> With failover, if the broker is restarted after having received the pull request, then the consumer is blocked indefinitely in receive(), and the broker never sends messages to it.
> Here is the stack of the blocked consumer. It never returned from the first call to receive().
> {quote}
> #0  pthread_cond_wait@@GLIBC_2.3.2 ()
> at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_wait.S:162
> #1  0x00007f2528c8fdd7 in decaf::internal::util::concurrent::ConditionImpl::wait (condition=0x118a024)
> at decaf/internal/util/concurrent/unix/ConditionImpl.cpp:101
> #2  0x00007f2528b23075 in activemq::core::SimplePriorityMessageDispatchChannel::dequeue (this=0x1181b40, timeout=-1)
> at activemq/core/SimplePriorityMessageDispatchChannel.cpp:77
> #3  0x00007f2528ad6d78 in activemq::core::ActiveMQConsumer::dequeue (this=0x11892d0, timeout=-1) at activemq/core/ActiveMQConsumer.cpp:553
> #4  0x00007f2528adbef1 in activemq::core::ActiveMQConsumer::receive (this=0x11892d0) at activemq/core/ActiveMQConsumer.cpp:601
> {quote}
> It seems the failover mechanism never notifies the consumer->internal->unconsumedMessages fifo, so the consumer stays blocked on dequeue(), while the new broker doesn't know the consumer has sent a pull request.
> I haven't found any use of the callback transportResumed() from the failover transport.
> A solution would be to use this callback to notifyAll() on all consumers internal queues.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira