You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Rob Davies (JIRA)" <ji...@apache.org> on 2006/11/29 08:55:02 UTC
[jira] Assigned: (AMQ-1070) Deadlock in Queue.java
[ https://issues.apache.org/activemq/browse/AMQ-1070?page=all ]
Rob Davies reassigned AMQ-1070:
-------------------------------
Assignee: Rob Davies
> Deadlock in Queue.java
> ----------------------
>
> Key: AMQ-1070
> URL: https://issues.apache.org/activemq/browse/AMQ-1070
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 4.0.2
> Reporter: Tom Kaitchuck
> Assigned To: Rob Davies
> Fix For: 4.2.0
>
>
> It is possible to have a deadlock as follows:
> "ActiveMQ Transport: tcp:///127.0.0.1:53335":
> at org.apache.activemq.broker.region.PrefetchSubscription.add(PrefetchSubscription.java:66)
> - waiting to lock <0x90786240> (a org.apache.activemq.broker.region.QueueSubscription)
> at org.apache.activemq.broker.region.Queue.addSubscription(Queue.java:192)
> - locked <0x908fa480> (a java.util.LinkedList)
> at org.apache.activemq.broker.region.AbstractRegion.addDestination(AbstractRegion.java:93)
> - locked <0x903b9b40> (a java.lang.Object)
> at org.apache.activemq.broker.region.RegionBroker.addDestination(RegionBroker.java:221)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
> at org.apache.activemq.advisory.AdvisoryBroker.addDestination(AdvisoryBroker.java:142)
> at org.apache.activemq.broker.BrokerFilter.addDestination(BrokerFilter.java:130)
> at org.apache.activemq.broker.MutableBrokerFilter.addDestination(MutableBrokerFilter.java:143)
> at org.apache.activemq.broker.region.AbstractRegion.addConsumer(AbstractRegion.java:182)
> - locked <0x908e6cb8> (a java.lang.Object)
> at org.apache.activemq.broker.region.RegionBroker.addConsumer(RegionBroker.java:297)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
> at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:78)
> at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:74)
> at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:87)
> at org.apache.activemq.broker.AbstractConnection.processAddConsumer(AbstractConnection.java:538)
> at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:296)
> at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
> at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
> at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
> at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
> at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
> at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
> at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
> at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
> at java.lang.Thread.run(Thread.java:595)
> "ActiveMQ Transport: tcp:///127.0.0.1:53315":
> at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:321)
> - waiting to lock <0x908fa480> (a java.util.LinkedList)
> at org.apache.activemq.broker.region.Queue.dropEvent(Queue.java:315)
> at org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:54)
> at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:125)
> - locked <0x90786240> (a org.apache.activemq.broker.region.QueueSubscription)
> at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:265)
> at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:366)
> at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
> at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
> at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:66)
> at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:79)
> at org.apache.activemq.broker.AbstractConnection.processMessageAck(AbstractConnection.java:445)
> at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
> at org.apache.activemq.broker.AbstractConnection.service(AbstractConnection.java:237)
> at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:63)
> at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:92)
> at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
> at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:124)
> at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:123)
> at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:88)
> at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:138)
> at java.lang.Thread.run(Thread.java:595)
> The simple solution is in AbstractReagion.java:
> @@ -89,10 +89,12 @@
> // Add all consumers that are interested in the destination.
> for (Iterator iter = subscriptions.values().iterator(); iter.hasNext();) {
> Subscription sub = (Subscription) iter.next();
> + synchronized (sub) {
> if( sub.matches(destination) ) {
> dest.addSubscription(context, sub);
> }
> }
> + }
> return dest;
> }
> }
> @@ -104,11 +106,13 @@
> if( timeout == 0 ) {
> for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
> Subscription sub=(Subscription) iter.next();
> + synchronized (sub) {
> if(sub.matches(destination)){
> throw new JMSException("Destination still has an active subscription: "+destination);
> }
> }
> }
> + }
> if( timeout > 0 ) {
> // TODO: implement a way to notify the subscribers that we want to take the down
> @@ -125,10 +129,12 @@
> // timeout<0 or we timed out, we now force any remaining subscriptions to un-subscribe.
> for(Iterator iter=subscriptions.values().iterator();iter.hasNext();){
> Subscription sub=(Subscription) iter.next();
> + synchronized (sub) {
> if(sub.matches(destination)){
> dest.removeSubscription(context, sub);
> }
> }
> + }
> destinationMap.removeAll(destination);
> dest.dispose(context);
> @@ -173,7 +179,8 @@
> // broker will not see a destination that exists in persistent store. We may want to
> // eagerly load all destinations into the broker but have an inactive state for the
> // destination which has reduced memory usage.
> - //
> + synchronized (sub)
> + {
> if( persistenceAdapter!=null ) {
> Set inactiveDests = getInactiveDestinations();
> for (Iterator iter = inactiveDests.iterator(); iter.hasNext();) {
> @@ -183,6 +190,7 @@
> }
> }
> }
> + }
> subscriptions.put(info.getConsumerId(), sub);
> @@ -193,16 +201,16 @@
> // no mutex held. Remove is only essentially run once
> // so everything after this point would be leaked.
> + synchronized (sub) {
> // Add the subscription to all the matching queues.
> for (Iterator iter = destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
> Destination dest = (Destination) iter.next();
> dest.addSubscription(context, sub);
> }
> -
> if( info.isBrowser() ) {
> ((QueueBrowserSubscription)sub).browseDone();
> }
> -
> + }
> return sub;
> }
> }
--
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira