You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by igah <we...@mail.com> on 2007/01/25 19:53:58 UTC

couldn't add consumers after existing consumers are killed (deadlock in broker)

i am using activemq-4.0.2.  i am running a test with 6 producers continously
sending stomp message to a queue, with 6 consumers on the same queue. if i
kill -9 all the consumers (which are all in one process), and restart the
consumers again, sometimes (about 30% of the time) the consumers cannot get
any messages anymore. if i inspect the broker, i see only one consumer
registered. 

after some digging myself, i found out the reason is that
org.apache.activemq.broker.region.Queue.addSubscription is blocked at
dispatchValve.turnOff()

the reason for that is when the previous consumers are killed, on the broker
side, in Queue.removeSubscription,  dispatchValve.turnOn() (line 241)  is
not invoked. a little bit more poking reveals that in all the failed cases,
line 233: dispatchPolicy.dispatch(context, node, msgContext, consumers)
invokes removeSubscription again, therefore causing the deadlock (i.e. the
thread that's supposed to call dispatchValve.turnOn again calls
removeSubscription which calls dispatchValve.turnOff which is blocked)

the 2nd "self" call is preceded by this transport exception in the log:
(probably because i killed the consumers)

2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport                     
- Transport failed: java.io.IOException: The transport
tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
        at
org.apache.activemq.transport.TransportSupport.checkStarted(TransportSupport.java:109)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:117)
        at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:142)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:82)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:87)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:45)
        at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:59)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:215)
        at
org.apache.activemq.broker.AbstractConnection.processDispatch(AbstractConnection.java:722)
        at
org.apache.activemq.broker.AbstractConnection.dispatchSync(AbstractConnection.java:699)
        at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:311)
        at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:152)
        at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:68)
        at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:54)
        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:233)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)
        at
org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:317)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:210)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:115)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer(AbstractConnection.java:553)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveSession(AbstractConnection.java:590)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConnection(AbstractConnection.java:660)
        at
org.apache.activemq.broker.AbstractConnection.stop(AbstractConnection.java:169)
        at
org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:98)
        at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop(ManagedTransportConnection.java:63)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.broker.AbstractConnection.serviceTransportException(AbstractConnection.java:183)
        at
org.apache.activemq.broker.TransportConnection$1.onException(TransportConnection.java:67)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:111)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:98)
        at
org.apache.activemq.transport.WireFormatNegotiator.onException(WireFormatNegotiator.java:130)
        at
org.apache.activemq.transport.InactivityMonitor.onException(InactivityMonitor.java:150)
        at
org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:101)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:150)
        at java.lang.Thread.run(Thread.java:595)


and i print out the stack when the "self" call to removeSubscription is
happening:

        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:181)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)
        at
org.apache.activemq.broker.region.RegionBroker.removeConsumer(RegionBroker.java:317)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer(AdvisoryBroker.java:210)
        at
org.apache.activemq.broker.BrokerFilter.removeConsumer(BrokerFilter.java:102)
        at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer(MutableBrokerFilter.java:115)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer(AbstractConnection.java:553)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveSession(AbstractConnection.java:590)
        at
org.apache.activemq.broker.AbstractConnection.processRemoveConnection(AbstractConnection.java:660)
        at
org.apache.activemq.broker.AbstractConnection.stop(AbstractConnection.java:169)
        at
org.apache.activemq.broker.TransportConnection.stop(TransportConnection.java:98)
        at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop(ManagedTransportConnection.java:
63)
        at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:40)
        at
org.apache.activemq.broker.AbstractConnection.serviceTransportException(AbstractConnection.java:183)
        at
org.apache.activemq.broker.AbstractConnection.serviceException(AbstractConnection.java:191)
        at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:218)
        at
org.apache.activemq.broker.AbstractConnection.processDispatch(AbstractConnection.java:722)
        at
org.apache.activemq.broker.AbstractConnection.dispatchSync(AbstractConnection.java:699)
        at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:311)
        at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:152)
        at
org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:68)
        at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.dispatch(RoundRobinDispatchPolicy.java:54)
        at
org.apache.activemq.broker.region.Queue.removeSubscription(Queue.java:233)
        at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer(AbstractRegion.java:208)

seems that serviceTransportException(AbstractConnection.java:183) is where
the self call is triggered. 

could someone look into this and let me know if there is simple fix for this
deadlock condition? thanks in advance. 
-- 
View this message in context: http://www.nabble.com/couldn%27t-add-consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%29-tf3118423.html#a8639042
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: couldn't add consumers after existing consumers are killed (deadlock in broker)

Posted by igah <we...@mail.com>.
thanks for the info. is it possible to get a patch to 4.0.2? i cannot really
upgrade to 4.2 at this moment. thanks a lot. 


I feel your pain - this is fixed in 4.2 - you can get a snapshot here:

http://people.apache.org/repo/m2-snapshot-repository/org/apache/ 
activemq/apache-activemq/4.2-incubator-SNAPSHOT/

cheers,

Rob

-- 
View this message in context: http://www.nabble.com/couldn%27t-add-consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%29-tf3118423.html#a8640000
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: couldn't add consumers after existing consumers are killed (deadlock in broker)

Posted by Rob Davies <ra...@gmail.com>.
understand you can't upgrade - unfortunately there are too many  
changes to do a patch :(

cheers,

Rob
On 26 Jan 2007, at 16:20, igah wrote:

>
> i tried the latest snapshot. it appears to have solved the problem.
>
> but i cannot upgrade to 4.2. is there a patch for 4.0.2 or could  
> you point
> out what files have been changed to fix this so that  i can  
> manually patch
> 4.0.2?
>
> thanks a lot.
>
>
> rajdavies wrote:
>>
>> I feel your pain - this is fixed in 4.2 - you can get a snapshot  
>> here:
>>
>> http://people.apache.org/repo/m2-snapshot-repository/org/apache/
>> activemq/apache-activemq/4.2-incubator-SNAPSHOT/
>>
>> cheers,
>>
>> Rob
>>
>> On 25 Jan 2007, at 18:53, igah wrote:
>>
>>>
>>> i am using activemq-4.0.2.  i am running a test with 6 producers
>>> continously
>>> sending stomp message to a queue, with 6 consumers on the same
>>> queue. if i
>>> kill -9 all the consumers (which are all in one process), and
>>> restart the
>>> consumers again, sometimes (about 30% of the time) the consumers
>>> cannot get
>>> any messages anymore. if i inspect the broker, i see only one  
>>> consumer
>>> registered.
>>>
>>> after some digging myself, i found out the reason is that
>>> org.apache.activemq.broker.region.Queue.addSubscription is  
>>> blocked at
>>> dispatchValve.turnOff()
>>>
>>> the reason for that is when the previous consumers are killed, on
>>> the broker
>>> side, in Queue.removeSubscription,  dispatchValve.turnOn() (line
>>> 241)  is
>>> not invoked. a little bit more poking reveals that in all the
>>> failed cases,
>>> line 233: dispatchPolicy.dispatch(context, node, msgContext,
>>> consumers)
>>> invokes removeSubscription again, therefore causing the deadlock
>>> (i.e. the
>>> thread that's supposed to call dispatchValve.turnOn again calls
>>> removeSubscription which calls dispatchValve.turnOff which is  
>>> blocked)
>>>
>>> the 2nd "self" call is preceded by this transport exception in the
>>> log:
>>> (probably because i killed the consumers)
>>>
>>> 2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport
>>> - Transport failed: java.io.IOException: The transport
>>> tcp:///127.0.0.1:56486 of type:
>>> org.apache.activemq.transport.tcp.TcpTransport is not running.
>>> java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
>>> org.apache.activemq.transport.tcp.TcpTransport is not running.
>>>         at
>>> org.apache.activemq.transport.TransportSupport.checkStarted
>>> (TransportSupport.java:109)
>>>         at
>>> org.apache.activemq.transport.tcp.TcpTransport.oneway
>>> (TcpTransport.java:117)
>>>         at
>>> org.apache.activemq.transport.InactivityMonitor.oneway
>>> (InactivityMonitor.java:142)
>>>         at
>>> org.apache.activemq.transport.TransportFilter.oneway
>>> (TransportFilter.java:82)
>>>         at
>>> org.apache.activemq.transport.WireFormatNegotiator.oneway
>>> (WireFormatNegotiator.java:87)
>>>         at
>>> org.apache.activemq.transport.MutexTransport.oneway
>>> (MutexTransport.java:45)
>>>         at
>>> org.apache.activemq.transport.ResponseCorrelator.oneway
>>> (ResponseCorrelator.java:59)
>>>         at
>>> org.apache.activemq.broker.TransportConnection.dispatch
>>> (TransportConnection.java:215)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processDispatch
>>> (AbstractConnection.java:722)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.dispatchSync
>>> (AbstractConnection.java:699)
>>>         at
>>> org.apache.activemq.broker.region.PrefetchSubscription.dispatch
>>> (PrefetchSubscription.java:311)
>>>         at
>>> org.apache.activemq.broker.region.QueueSubscription.dispatch
>>> (QueueSubscription.java:152)
>>>         at
>>> org.apache.activemq.broker.region.PrefetchSubscription.add
>>> (PrefetchSubscription.java:68)
>>>         at
>>> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.di 
>>> sp
>>> atch(RoundRobinDispatchPolicy.java:54)
>>>         at
>>> org.apache.activemq.broker.region.Queue.removeSubscription
>>> (Queue.java:233)
>>>         at
>>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer
>>> (AbstractRegion.java:208)
>>>         at
>>> org.apache.activemq.broker.region.RegionBroker.removeConsumer
>>> (RegionBroker.java:317)
>>>         at
>>> org.apache.activemq.broker.BrokerFilter.removeConsumer
>>> (BrokerFilter.java:102)
>>>         at
>>> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer
>>> (AdvisoryBroker.java:210)
>>>         at
>>> org.apache.activemq.broker.BrokerFilter.removeConsumer
>>> (BrokerFilter.java:102)
>>>         at
>>> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer
>>> (MutableBrokerFilter.java:115)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer
>>> (AbstractConnection.java:553)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveSession
>>> (AbstractConnection.java:590)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveConnectio 
>>> n(
>>> AbstractConnection.java:660)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.stop
>>> (AbstractConnection.java:169)
>>>         at
>>> org.apache.activemq.broker.TransportConnection.stop
>>> (TransportConnection.java:98)
>>>         at
>>> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop
>>> (ManagedTransportConnection.java:63)
>>>         at
>>> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:
>>> 40)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.serviceTransportExcept 
>>> io
>>> n(AbstractConnection.java:183)
>>>         at
>>> org.apache.activemq.broker.TransportConnection$1.onException
>>> (TransportConnection.java:67)
>>>         at
>>> org.apache.activemq.transport.TransportFilter.onException
>>> (TransportFilter.java:98)
>>>         at
>>> org.apache.activemq.transport.ResponseCorrelator.onException
>>> (ResponseCorrelator.java:111)
>>>         at
>>> org.apache.activemq.transport.TransportFilter.onException
>>> (TransportFilter.java:98)
>>>         at
>>> org.apache.activemq.transport.TransportFilter.onException
>>> (TransportFilter.java:98)
>>>         at
>>> org.apache.activemq.transport.WireFormatNegotiator.onException
>>> (WireFormatNegotiator.java:130)
>>>         at
>>> org.apache.activemq.transport.InactivityMonitor.onException
>>> (InactivityMonitor.java:150)
>>>         at
>>> org.apache.activemq.transport.TransportSupport.onException
>>> (TransportSupport.java:101)
>>>         at
>>> org.apache.activemq.transport.tcp.TcpTransport.run
>>> (TcpTransport.java:150)
>>>         at java.lang.Thread.run(Thread.java:595)
>>>
>>>
>>> and i print out the stack when the "self" call to
>>> removeSubscription is
>>> happening:
>>>
>>>         at
>>> org.apache.activemq.broker.region.Queue.removeSubscription
>>> (Queue.java:181)
>>>         at
>>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer
>>> (AbstractRegion.java:208)
>>>         at
>>> org.apache.activemq.broker.region.RegionBroker.removeConsumer
>>> (RegionBroker.java:317)
>>>         at
>>> org.apache.activemq.broker.BrokerFilter.removeConsumer
>>> (BrokerFilter.java:102)
>>>         at
>>> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer
>>> (AdvisoryBroker.java:210)
>>>         at
>>> org.apache.activemq.broker.BrokerFilter.removeConsumer
>>> (BrokerFilter.java:102)
>>>         at
>>> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer
>>> (MutableBrokerFilter.java:115)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer
>>> (AbstractConnection.java:553)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveSession
>>> (AbstractConnection.java:590)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processRemoveConnectio 
>>> n(
>>> AbstractConnection.java:660)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.stop
>>> (AbstractConnection.java:169)
>>>         at
>>> org.apache.activemq.broker.TransportConnection.stop
>>> (TransportConnection.java:98)
>>>         at
>>> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop
>>> (ManagedTransportConnection.java:
>>> 63)
>>>         at
>>> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:
>>> 40)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.serviceTransportExcept 
>>> io
>>> n(AbstractConnection.java:183)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.serviceException
>>> (AbstractConnection.java:191)
>>>         at
>>> org.apache.activemq.broker.TransportConnection.dispatch
>>> (TransportConnection.java:218)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.processDispatch
>>> (AbstractConnection.java:722)
>>>         at
>>> org.apache.activemq.broker.AbstractConnection.dispatchSync
>>> (AbstractConnection.java:699)
>>>         at
>>> org.apache.activemq.broker.region.PrefetchSubscription.dispatch
>>> (PrefetchSubscription.java:311)
>>>         at
>>> org.apache.activemq.broker.region.QueueSubscription.dispatch
>>> (QueueSubscription.java:152)
>>>         at
>>> org.apache.activemq.broker.region.PrefetchSubscription.add
>>> (PrefetchSubscription.java:68)
>>>         at
>>> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.di 
>>> sp
>>> atch(RoundRobinDispatchPolicy.java:54)
>>>         at
>>> org.apache.activemq.broker.region.Queue.removeSubscription
>>> (Queue.java:233)
>>>         at
>>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer
>>> (AbstractRegion.java:208)
>>>
>>> seems that serviceTransportException(AbstractConnection.java:183)
>>> is where
>>> the self call is triggered.
>>>
>>> could someone look into this and let me know if there is simple fix
>>> for this
>>> deadlock condition? thanks in advance.
>>> -- 
>>> View this message in context: http://www.nabble.com/couldn%27t-add-
>>> consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%
>>> 29-tf3118423.html#a8639042
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>
>>
>>
>>
>
> -- 
> View this message in context: http://www.nabble.com/couldn%27t-add- 
> consumers-after-existing-consumers-are-killed-%28deadlock-in-broker% 
> 29-tf3118423.html#a8653911
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>


Re: couldn't add consumers after existing consumers are killed (deadlock in broker)

Posted by igah <we...@mail.com>.
i tried the latest snapshot. it appears to have solved the problem.

but i cannot upgrade to 4.2. is there a patch for 4.0.2 or could you point
out what files have been changed to fix this so that  i can manually patch
4.0.2? 

thanks a lot. 


rajdavies wrote:
> 
> I feel your pain - this is fixed in 4.2 - you can get a snapshot here:
> 
> http://people.apache.org/repo/m2-snapshot-repository/org/apache/ 
> activemq/apache-activemq/4.2-incubator-SNAPSHOT/
> 
> cheers,
> 
> Rob
> 
> On 25 Jan 2007, at 18:53, igah wrote:
> 
>>
>> i am using activemq-4.0.2.  i am running a test with 6 producers  
>> continously
>> sending stomp message to a queue, with 6 consumers on the same  
>> queue. if i
>> kill -9 all the consumers (which are all in one process), and  
>> restart the
>> consumers again, sometimes (about 30% of the time) the consumers  
>> cannot get
>> any messages anymore. if i inspect the broker, i see only one consumer
>> registered.
>>
>> after some digging myself, i found out the reason is that
>> org.apache.activemq.broker.region.Queue.addSubscription is blocked at
>> dispatchValve.turnOff()
>>
>> the reason for that is when the previous consumers are killed, on  
>> the broker
>> side, in Queue.removeSubscription,  dispatchValve.turnOn() (line  
>> 241)  is
>> not invoked. a little bit more poking reveals that in all the  
>> failed cases,
>> line 233: dispatchPolicy.dispatch(context, node, msgContext,  
>> consumers)
>> invokes removeSubscription again, therefore causing the deadlock  
>> (i.e. the
>> thread that's supposed to call dispatchValve.turnOn again calls
>> removeSubscription which calls dispatchValve.turnOff which is blocked)
>>
>> the 2nd "self" call is preceded by this transport exception in the  
>> log:
>> (probably because i killed the consumers)
>>
>> 2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport
>> - Transport failed: java.io.IOException: The transport
>> tcp:///127.0.0.1:56486 of type:
>> org.apache.activemq.transport.tcp.TcpTransport is not running.
>> java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
>> org.apache.activemq.transport.tcp.TcpTransport is not running.
>>         at
>> org.apache.activemq.transport.TransportSupport.checkStarted 
>> (TransportSupport.java:109)
>>         at
>> org.apache.activemq.transport.tcp.TcpTransport.oneway 
>> (TcpTransport.java:117)
>>         at
>> org.apache.activemq.transport.InactivityMonitor.oneway 
>> (InactivityMonitor.java:142)
>>         at
>> org.apache.activemq.transport.TransportFilter.oneway 
>> (TransportFilter.java:82)
>>         at
>> org.apache.activemq.transport.WireFormatNegotiator.oneway 
>> (WireFormatNegotiator.java:87)
>>         at
>> org.apache.activemq.transport.MutexTransport.oneway 
>> (MutexTransport.java:45)
>>         at
>> org.apache.activemq.transport.ResponseCorrelator.oneway 
>> (ResponseCorrelator.java:59)
>>         at
>> org.apache.activemq.broker.TransportConnection.dispatch 
>> (TransportConnection.java:215)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processDispatch 
>> (AbstractConnection.java:722)
>>         at
>> org.apache.activemq.broker.AbstractConnection.dispatchSync 
>> (AbstractConnection.java:699)
>>         at
>> org.apache.activemq.broker.region.PrefetchSubscription.dispatch 
>> (PrefetchSubscription.java:311)
>>         at
>> org.apache.activemq.broker.region.QueueSubscription.dispatch 
>> (QueueSubscription.java:152)
>>         at
>> org.apache.activemq.broker.region.PrefetchSubscription.add 
>> (PrefetchSubscription.java:68)
>>         at
>> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.disp 
>> atch(RoundRobinDispatchPolicy.java:54)
>>         at
>> org.apache.activemq.broker.region.Queue.removeSubscription 
>> (Queue.java:233)
>>         at
>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
>> (AbstractRegion.java:208)
>>         at
>> org.apache.activemq.broker.region.RegionBroker.removeConsumer 
>> (RegionBroker.java:317)
>>         at
>> org.apache.activemq.broker.BrokerFilter.removeConsumer 
>> (BrokerFilter.java:102)
>>         at
>> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer 
>> (AdvisoryBroker.java:210)
>>         at
>> org.apache.activemq.broker.BrokerFilter.removeConsumer 
>> (BrokerFilter.java:102)
>>         at
>> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer 
>> (MutableBrokerFilter.java:115)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer 
>> (AbstractConnection.java:553)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveSession 
>> (AbstractConnection.java:590)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveConnection( 
>> AbstractConnection.java:660)
>>         at
>> org.apache.activemq.broker.AbstractConnection.stop 
>> (AbstractConnection.java:169)
>>         at
>> org.apache.activemq.broker.TransportConnection.stop 
>> (TransportConnection.java:98)
>>         at
>> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop 
>> (ManagedTransportConnection.java:63)
>>         at
>> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java: 
>> 40)
>>         at
>> org.apache.activemq.broker.AbstractConnection.serviceTransportExceptio 
>> n(AbstractConnection.java:183)
>>         at
>> org.apache.activemq.broker.TransportConnection$1.onException 
>> (TransportConnection.java:67)
>>         at
>> org.apache.activemq.transport.TransportFilter.onException 
>> (TransportFilter.java:98)
>>         at
>> org.apache.activemq.transport.ResponseCorrelator.onException 
>> (ResponseCorrelator.java:111)
>>         at
>> org.apache.activemq.transport.TransportFilter.onException 
>> (TransportFilter.java:98)
>>         at
>> org.apache.activemq.transport.TransportFilter.onException 
>> (TransportFilter.java:98)
>>         at
>> org.apache.activemq.transport.WireFormatNegotiator.onException 
>> (WireFormatNegotiator.java:130)
>>         at
>> org.apache.activemq.transport.InactivityMonitor.onException 
>> (InactivityMonitor.java:150)
>>         at
>> org.apache.activemq.transport.TransportSupport.onException 
>> (TransportSupport.java:101)
>>         at
>> org.apache.activemq.transport.tcp.TcpTransport.run 
>> (TcpTransport.java:150)
>>         at java.lang.Thread.run(Thread.java:595)
>>
>>
>> and i print out the stack when the "self" call to  
>> removeSubscription is
>> happening:
>>
>>         at
>> org.apache.activemq.broker.region.Queue.removeSubscription 
>> (Queue.java:181)
>>         at
>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
>> (AbstractRegion.java:208)
>>         at
>> org.apache.activemq.broker.region.RegionBroker.removeConsumer 
>> (RegionBroker.java:317)
>>         at
>> org.apache.activemq.broker.BrokerFilter.removeConsumer 
>> (BrokerFilter.java:102)
>>         at
>> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer 
>> (AdvisoryBroker.java:210)
>>         at
>> org.apache.activemq.broker.BrokerFilter.removeConsumer 
>> (BrokerFilter.java:102)
>>         at
>> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer 
>> (MutableBrokerFilter.java:115)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer 
>> (AbstractConnection.java:553)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveSession 
>> (AbstractConnection.java:590)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processRemoveConnection( 
>> AbstractConnection.java:660)
>>         at
>> org.apache.activemq.broker.AbstractConnection.stop 
>> (AbstractConnection.java:169)
>>         at
>> org.apache.activemq.broker.TransportConnection.stop 
>> (TransportConnection.java:98)
>>         at
>> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop 
>> (ManagedTransportConnection.java:
>> 63)
>>         at
>> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java: 
>> 40)
>>         at
>> org.apache.activemq.broker.AbstractConnection.serviceTransportExceptio 
>> n(AbstractConnection.java:183)
>>         at
>> org.apache.activemq.broker.AbstractConnection.serviceException 
>> (AbstractConnection.java:191)
>>         at
>> org.apache.activemq.broker.TransportConnection.dispatch 
>> (TransportConnection.java:218)
>>         at
>> org.apache.activemq.broker.AbstractConnection.processDispatch 
>> (AbstractConnection.java:722)
>>         at
>> org.apache.activemq.broker.AbstractConnection.dispatchSync 
>> (AbstractConnection.java:699)
>>         at
>> org.apache.activemq.broker.region.PrefetchSubscription.dispatch 
>> (PrefetchSubscription.java:311)
>>         at
>> org.apache.activemq.broker.region.QueueSubscription.dispatch 
>> (QueueSubscription.java:152)
>>         at
>> org.apache.activemq.broker.region.PrefetchSubscription.add 
>> (PrefetchSubscription.java:68)
>>         at
>> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.disp 
>> atch(RoundRobinDispatchPolicy.java:54)
>>         at
>> org.apache.activemq.broker.region.Queue.removeSubscription 
>> (Queue.java:233)
>>         at
>> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
>> (AbstractRegion.java:208)
>>
>> seems that serviceTransportException(AbstractConnection.java:183)  
>> is where
>> the self call is triggered.
>>
>> could someone look into this and let me know if there is simple fix  
>> for this
>> deadlock condition? thanks in advance.
>> -- 
>> View this message in context: http://www.nabble.com/couldn%27t-add- 
>> consumers-after-existing-consumers-are-killed-%28deadlock-in-broker% 
>> 29-tf3118423.html#a8639042
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/couldn%27t-add-consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%29-tf3118423.html#a8653911
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Re: couldn't add consumers after existing consumers are killed (deadlock in broker)

Posted by Rob Davies <ra...@gmail.com>.
I feel your pain - this is fixed in 4.2 - you can get a snapshot here:

http://people.apache.org/repo/m2-snapshot-repository/org/apache/ 
activemq/apache-activemq/4.2-incubator-SNAPSHOT/

cheers,

Rob

On 25 Jan 2007, at 18:53, igah wrote:

>
> i am using activemq-4.0.2.  i am running a test with 6 producers  
> continously
> sending stomp message to a queue, with 6 consumers on the same  
> queue. if i
> kill -9 all the consumers (which are all in one process), and  
> restart the
> consumers again, sometimes (about 30% of the time) the consumers  
> cannot get
> any messages anymore. if i inspect the broker, i see only one consumer
> registered.
>
> after some digging myself, i found out the reason is that
> org.apache.activemq.broker.region.Queue.addSubscription is blocked at
> dispatchValve.turnOff()
>
> the reason for that is when the previous consumers are killed, on  
> the broker
> side, in Queue.removeSubscription,  dispatchValve.turnOn() (line  
> 241)  is
> not invoked. a little bit more poking reveals that in all the  
> failed cases,
> line 233: dispatchPolicy.dispatch(context, node, msgContext,  
> consumers)
> invokes removeSubscription again, therefore causing the deadlock  
> (i.e. the
> thread that's supposed to call dispatchValve.turnOn again calls
> removeSubscription which calls dispatchValve.turnOff which is blocked)
>
> the 2nd "self" call is preceded by this transport exception in the  
> log:
> (probably because i killed the consumers)
>
> 2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport
> - Transport failed: java.io.IOException: The transport
> tcp:///127.0.0.1:56486 of type:
> org.apache.activemq.transport.tcp.TcpTransport is not running.
> java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
> org.apache.activemq.transport.tcp.TcpTransport is not running.
>         at
> org.apache.activemq.transport.TransportSupport.checkStarted 
> (TransportSupport.java:109)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.oneway 
> (TcpTransport.java:117)
>         at
> org.apache.activemq.transport.InactivityMonitor.oneway 
> (InactivityMonitor.java:142)
>         at
> org.apache.activemq.transport.TransportFilter.oneway 
> (TransportFilter.java:82)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.oneway 
> (WireFormatNegotiator.java:87)
>         at
> org.apache.activemq.transport.MutexTransport.oneway 
> (MutexTransport.java:45)
>         at
> org.apache.activemq.transport.ResponseCorrelator.oneway 
> (ResponseCorrelator.java:59)
>         at
> org.apache.activemq.broker.TransportConnection.dispatch 
> (TransportConnection.java:215)
>         at
> org.apache.activemq.broker.AbstractConnection.processDispatch 
> (AbstractConnection.java:722)
>         at
> org.apache.activemq.broker.AbstractConnection.dispatchSync 
> (AbstractConnection.java:699)
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.dispatch 
> (PrefetchSubscription.java:311)
>         at
> org.apache.activemq.broker.region.QueueSubscription.dispatch 
> (QueueSubscription.java:152)
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.add 
> (PrefetchSubscription.java:68)
>         at
> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.disp 
> atch(RoundRobinDispatchPolicy.java:54)
>         at
> org.apache.activemq.broker.region.Queue.removeSubscription 
> (Queue.java:233)
>         at
> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
> (AbstractRegion.java:208)
>         at
> org.apache.activemq.broker.region.RegionBroker.removeConsumer 
> (RegionBroker.java:317)
>         at
> org.apache.activemq.broker.BrokerFilter.removeConsumer 
> (BrokerFilter.java:102)
>         at
> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer 
> (AdvisoryBroker.java:210)
>         at
> org.apache.activemq.broker.BrokerFilter.removeConsumer 
> (BrokerFilter.java:102)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer 
> (MutableBrokerFilter.java:115)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer 
> (AbstractConnection.java:553)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveSession 
> (AbstractConnection.java:590)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveConnection( 
> AbstractConnection.java:660)
>         at
> org.apache.activemq.broker.AbstractConnection.stop 
> (AbstractConnection.java:169)
>         at
> org.apache.activemq.broker.TransportConnection.stop 
> (TransportConnection.java:98)
>         at
> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop 
> (ManagedTransportConnection.java:63)
>         at
> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java: 
> 40)
>         at
> org.apache.activemq.broker.AbstractConnection.serviceTransportExceptio 
> n(AbstractConnection.java:183)
>         at
> org.apache.activemq.broker.TransportConnection$1.onException 
> (TransportConnection.java:67)
>         at
> org.apache.activemq.transport.TransportFilter.onException 
> (TransportFilter.java:98)
>         at
> org.apache.activemq.transport.ResponseCorrelator.onException 
> (ResponseCorrelator.java:111)
>         at
> org.apache.activemq.transport.TransportFilter.onException 
> (TransportFilter.java:98)
>         at
> org.apache.activemq.transport.TransportFilter.onException 
> (TransportFilter.java:98)
>         at
> org.apache.activemq.transport.WireFormatNegotiator.onException 
> (WireFormatNegotiator.java:130)
>         at
> org.apache.activemq.transport.InactivityMonitor.onException 
> (InactivityMonitor.java:150)
>         at
> org.apache.activemq.transport.TransportSupport.onException 
> (TransportSupport.java:101)
>         at
> org.apache.activemq.transport.tcp.TcpTransport.run 
> (TcpTransport.java:150)
>         at java.lang.Thread.run(Thread.java:595)
>
>
> and i print out the stack when the "self" call to  
> removeSubscription is
> happening:
>
>         at
> org.apache.activemq.broker.region.Queue.removeSubscription 
> (Queue.java:181)
>         at
> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
> (AbstractRegion.java:208)
>         at
> org.apache.activemq.broker.region.RegionBroker.removeConsumer 
> (RegionBroker.java:317)
>         at
> org.apache.activemq.broker.BrokerFilter.removeConsumer 
> (BrokerFilter.java:102)
>         at
> org.apache.activemq.advisory.AdvisoryBroker.removeConsumer 
> (AdvisoryBroker.java:210)
>         at
> org.apache.activemq.broker.BrokerFilter.removeConsumer 
> (BrokerFilter.java:102)
>         at
> org.apache.activemq.broker.MutableBrokerFilter.removeConsumer 
> (MutableBrokerFilter.java:115)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveConsumer 
> (AbstractConnection.java:553)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveSession 
> (AbstractConnection.java:590)
>         at
> org.apache.activemq.broker.AbstractConnection.processRemoveConnection( 
> AbstractConnection.java:660)
>         at
> org.apache.activemq.broker.AbstractConnection.stop 
> (AbstractConnection.java:169)
>         at
> org.apache.activemq.broker.TransportConnection.stop 
> (TransportConnection.java:98)
>         at
> org.apache.activemq.broker.jmx.ManagedTransportConnection.stop 
> (ManagedTransportConnection.java:
> 63)
>         at
> org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java: 
> 40)
>         at
> org.apache.activemq.broker.AbstractConnection.serviceTransportExceptio 
> n(AbstractConnection.java:183)
>         at
> org.apache.activemq.broker.AbstractConnection.serviceException 
> (AbstractConnection.java:191)
>         at
> org.apache.activemq.broker.TransportConnection.dispatch 
> (TransportConnection.java:218)
>         at
> org.apache.activemq.broker.AbstractConnection.processDispatch 
> (AbstractConnection.java:722)
>         at
> org.apache.activemq.broker.AbstractConnection.dispatchSync 
> (AbstractConnection.java:699)
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.dispatch 
> (PrefetchSubscription.java:311)
>         at
> org.apache.activemq.broker.region.QueueSubscription.dispatch 
> (QueueSubscription.java:152)
>         at
> org.apache.activemq.broker.region.PrefetchSubscription.add 
> (PrefetchSubscription.java:68)
>         at
> org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.disp 
> atch(RoundRobinDispatchPolicy.java:54)
>         at
> org.apache.activemq.broker.region.Queue.removeSubscription 
> (Queue.java:233)
>         at
> org.apache.activemq.broker.region.AbstractRegion.removeConsumer 
> (AbstractRegion.java:208)
>
> seems that serviceTransportException(AbstractConnection.java:183)  
> is where
> the self call is triggered.
>
> could someone look into this and let me know if there is simple fix  
> for this
> deadlock condition? thanks in advance.
> -- 
> View this message in context: http://www.nabble.com/couldn%27t-add- 
> consumers-after-existing-consumers-are-killed-%28deadlock-in-broker% 
> 29-tf3118423.html#a8639042
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>