You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Stirling Chow (JIRA)" <ji...@apache.org> on 2011/01/10 22:47:45 UTC

[jira] Created: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
------------------------------------------------------------------------------------------------------

                 Key: AMQ-3127
                 URL: https://issues.apache.org/jira/browse/AMQ-3127
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 5.4.2
            Reporter: Stirling Chow
            Priority: Critical


Symptom
=======
We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.

We have verified that the cause of this issue exists in AMQ 5.4.2.

Cause
=====
Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:

Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: Object  (id=104)	
	owns: Object  (id=105)	
	owns: Object  (id=106)	
	owns: Queue$3  (id=107)	
	waiting for: Object  (id=108)	
		owned by: Daemon Thread [VMTransport] (Running)	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
	ResponseCorrelator.onCommand(Object) line: 116	
	MutexTransport(TransportFilter).onCommand(Object) line: 69	
	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
	VMTransport.oneway(Object) line: 113	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
	Queue.pageInMessages(boolean) line: 1898	
	Queue.iterate() line: 1425	
	PooledTaskRunner.runTask() line: 122	
	PooledTaskRunner$1.run() line: 43	
	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
	ThreadPoolExecutor$Worker.run() line: 908	
	Thread.run() line: 662	

Daemon Thread [VMTransport] (Suspended)	
	owns: Object  (id=499)	
	owns: RegionBroker$1  (id=205)	
		waited by: Daemon Thread [VMTransport] (Running)	
		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
	owns: Object  (id=108)	
		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: URI  (id=500)	
	Unsafe.park(boolean, long) line: not available [native method]	
	LockSupport.park(Object) line: 158	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
	ReentrantReadWriteLock$WriteLock.lock() line: 807	
	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
	ConsumerInfo.visit(CommandVisitor) line: 349	
	
Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        boolean doWakeUp = false;

        pagedInPendingDispatchLock.writeLock().lock();
	
BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:

    protected void serviceLocalCommand(Command command) {
    ...
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));

Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:

    public void oneway(Object command) throws IOException {
        synchronized (writeMutex) {
            next.oneway(command);
        }
    }

So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.

At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:

    protected void addSubscription(DemandSubscription sub) throws IOException {
        if (sub != null) {
            localBroker.oneway(sub.getLocalInfo());
        }
    }

Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:

    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();

So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.

Solution
======
Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.

There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:

    protected void startAllConnectors() throws Exception {
....
            URI uri = getVmConnectorURI();
            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
            map.put("network", "true");
            map.put("async", "false");

This change was made by the following checkin, but no rational was given:

Revision: 553094
Author: rajdavies
Date: 11:33:48 PM, July 3, 2007
Message:
set async=false for network connectors
----
Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Addendum
=========
We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:

1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).

If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	




Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Attachment:     (was: BridgeDeadlockTest.java)

> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] [Commented] (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13483727#comment-13483727 ] 

Stirling Chow commented on AMQ-3127:
------------------------------------

The only change in the updated BridgeDeadlockTest is the addition of line 72: persistentDelivery = false

The reason the original unit test passed on trunk as because of changes made to DemandForwardingBridgeSupport.java:

{code:title=DemandForwardingBridgeSupport.java}
protected void serviceLocalCommand(Command command) {
...
                    if (!configuration.isAlwaysSyncSend() && !message.isPersistent()) {

                        // If the message was originally sent using async
                        // send, we will preserve that QOS
                        // by bridging it using an async send (small chance
                        // of message loss).
                        try {
                            remoteBroker.oneway(message);
                            localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                            dequeueCounter.incrementAndGet();
                        } finally {
                            sub.decrementOutstandingResponses();
                        }

                    } else {
{code}

As originally described, the problematic code is the localBroker.oneway(...).  In trunk, the condition (!configuration.isAlwaysSincSend() && !message.isPersistent()) was preventing the original test from entering this block.  Setting persistentDelivery = false allows the block to be entered, thus creating the deadlock.
                
> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] Commented: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12980318#action_12980318 ] 

Stirling Chow commented on AMQ-3127:
------------------------------------

Gary, I ran the tests in org.apache.activemq.network several times and they succeeded.  If after consultation with Rob it is decided that the patch is too broad, we could always patch just the DiscoveryNetworkConnector/DemandForwardingBridgeSupport so that they make the localTransport asynchronous.

-------------------------------------------------------
 T E S T S
-------------------------------------------------------
Running org.apache.activemq.network.NetworkLoadTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 65.782 sec
Running org.apache.activemq.network.DemandForwardingBridgeFilterTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 24.859 sec
Running org.apache.activemq.network.NetworkReconnectTest
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 6.391 sec
Running org.apache.activemq.network.DuplexNetworkTest
Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 77.36 sec
Running org.apache.activemq.network.ForwardingBridgeTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 8.5 sec
Running org.apache.activemq.network.SimpleNetworkTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 73.5 sec
Running org.apache.activemq.network.DuplexNetworkMBeanTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 29.532 sec
Running org.apache.activemq.network.NetworkRemovesSubscriptionsTest
Tests run: 5, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 34.406 sec
Running org.apache.activemq.network.DemandForwardingBridgeTest
Tests run: 6, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 4.703 sec
Running org.apache.activemq.network.FailoverStaticNetworkTest
Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 32.203 sec
Running org.apache.activemq.network.NetworkBrokerDetachTest
Tests run: 2, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 22.501 sec

Results :

Tests run: 41, Failures: 0, Errors: 0, Skipped: 0


> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Attachment: BridgeDeadlockTest.java

Unit test demonstrating deadlock.  The unit test passes after applying the patch.

> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] [Updated] (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Attachment: BridgeDeadlockTest.java
    
> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] Updated: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Attachment: AMQ-3127.diff

Patch the resolves deadlock by making the local transport for a bridge asynchronous.

> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] [Reopened] (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow reopened AMQ-3127:
--------------------------------


Attaching a slightly-modified unit test the demonstrates the deadlock is still present on trunk code.
                
> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

[jira] Updated: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Description: 
Symptom
=======
We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.

We have verified that the cause of this issue exists in AMQ 5.4.2.

Cause
=====
Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:

Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: Object  (id=104)	
	owns: Object  (id=105)	
	owns: Object  (id=106)	
	owns: Queue$3  (id=107)	
	waiting for: Object  (id=108)	
		owned by: Daemon Thread [VMTransport] (Running)	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
	ResponseCorrelator.onCommand(Object) line: 116	
	MutexTransport(TransportFilter).onCommand(Object) line: 69	
	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
	VMTransport.oneway(Object) line: 113	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
	Queue.pageInMessages(boolean) line: 1898	
	Queue.iterate() line: 1425	
	PooledTaskRunner.runTask() line: 122	
	PooledTaskRunner$1.run() line: 43	
	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
	ThreadPoolExecutor$Worker.run() line: 908	
	Thread.run() line: 662	

Daemon Thread [VMTransport] (Suspended)	
	owns: Object  (id=499)	
	owns: RegionBroker$1  (id=205)	
		waited by: Daemon Thread [VMTransport] (Running)	
		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
	owns: Object  (id=108)	
		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: URI  (id=500)	
	Unsafe.park(boolean, long) line: not available [native method]	
	LockSupport.park(Object) line: 158	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
	ReentrantReadWriteLock$WriteLock.lock() line: 807	
	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
	ConsumerInfo.visit(CommandVisitor) line: 349	
	
Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        boolean doWakeUp = false;

        pagedInPendingDispatchLock.writeLock().lock();
	
BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:

    protected void serviceLocalCommand(Command command) {
    ...
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));

Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:

    public void oneway(Object command) throws IOException {
        synchronized (writeMutex) {
            next.oneway(command);
        }
    }

So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.

At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:

    protected void addSubscription(DemandSubscription sub) throws IOException {
        if (sub != null) {
            localBroker.oneway(sub.getLocalInfo());
        }
    }

Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:

    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();

So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.

Solution
======
Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.

There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:

    protected void startAllConnectors() throws Exception {
....
            URI uri = getVmConnectorURI();
            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
            map.put("network", "true");
            map.put("async", "false");

This change was made by the following checkin, but no rational was given:

Revision: 553094
Author: rajdavies
Date: 11:33:48 PM, July 3, 2007
Message:
set async=false for network connectors
----
Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Addendum
=========
We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:

1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).

If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	




Through further testing 

  was:
Symptom
=======
We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.

We have verified that the cause of this issue exists in AMQ 5.4.2.

Cause
=====
Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:

Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: Object  (id=104)	
	owns: Object  (id=105)	
	owns: Object  (id=106)	
	owns: Queue$3  (id=107)	
	waiting for: Object  (id=108)	
		owned by: Daemon Thread [VMTransport] (Running)	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
	ResponseCorrelator.onCommand(Object) line: 116	
	MutexTransport(TransportFilter).onCommand(Object) line: 69	
	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
	VMTransport.oneway(Object) line: 113	
	MutexTransport.oneway(Object) line: 40	
	ResponseCorrelator.oneway(Object) line: 60	
	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
	Queue.pageInMessages(boolean) line: 1898	
	Queue.iterate() line: 1425	
	PooledTaskRunner.runTask() line: 122	
	PooledTaskRunner$1.run() line: 43	
	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
	ThreadPoolExecutor$Worker.run() line: 908	
	Thread.run() line: 662	

Daemon Thread [VMTransport] (Suspended)	
	owns: Object  (id=499)	
	owns: RegionBroker$1  (id=205)	
		waited by: Daemon Thread [VMTransport] (Running)	
		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
	owns: Object  (id=108)	
		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
	owns: URI  (id=500)	
	Unsafe.park(boolean, long) line: not available [native method]	
	LockSupport.park(Object) line: 158	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
	ReentrantReadWriteLock$WriteLock.lock() line: 807	
	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
	ConsumerInfo.visit(CommandVisitor) line: 349	
	
Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:

    private void doDispatch(List<QueueMessageReference> list) throws Exception {
        boolean doWakeUp = false;

        pagedInPendingDispatchLock.writeLock().lock();
	
BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:

    protected void serviceLocalCommand(Command command) {
    ...
                        if (!message.isResponseRequired()) {
                            
                            // If the message was originally sent using async
                            // send, we will preserve that QOS
                            // by bridging it using an async send (small chance
                            // of message loss).
                            try {
                                remoteBroker.oneway(message);
                                localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));

Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:

    public void oneway(Object command) throws IOException {
        synchronized (writeMutex) {
            next.oneway(command);
        }
    }

So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.

At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:

    protected void addSubscription(DemandSubscription sub) throws IOException {
        if (sub != null) {
            localBroker.oneway(sub.getLocalInfo());
        }
    }

Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:

    public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
        super.addSubscription(context, sub);
        // synchronize with dispatch method so that no new messages are sent
        // while setting up a subscription. avoid out of order messages,
        // duplicates, etc.
        pagedInPendingDispatchLock.writeLock().lock();

So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.

Solution
======
Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.

There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:

    protected void startAllConnectors() throws Exception {
....
            URI uri = getVmConnectorURI();
            Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
            map.put("network", "true");
            map.put("async", "false");

This change was made by the following checkin, but no rational was given:

Revision: 553094
Author: rajdavies
Date: 11:33:48 PM, July 3, 2007
Message:
set async=false for network connectors
----
Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java

Addendum
=========
We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:

1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).

If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	
	




Through further testing 


> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] [Closed] (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Timothy Bish (Closed) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timothy Bish closed AMQ-3127.
-----------------------------

    Resolution: Cannot Reproduce

Ran the test repeatedly and could not reproduce using the latest trunk code.
                
> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] Commented: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Gary Tully (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12980109#action_12980109 ] 

Gary Tully commented on AMQ-3127:
---------------------------------

Great investigative work. we ping rob to see if he has any recollection as to the reason/use case behind the Revision: 553094
Did you try a bunch of the other network connector unit tests org.apache.activemq.network in activemq-core with this fix?


> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that three specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The bridge must be configured to dispatch synchronously; this is so that message subscriptions are are dispatched by the same thread that accesses the queue.
> 3) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If any of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


[jira] Updated: (AMQ-3127) Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.

Posted by "Stirling Chow (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/AMQ-3127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stirling Chow updated AMQ-3127:
-------------------------------

    Attachment: BridgeDeadlockTest.java

Unit test (amended to enable timeout) that demonstrates deadlock.  The test passes after applying patch.

> Network bridge causes deadlock on queue/topic when message dispatch and consumer registration overlap.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-3127
>                 URL: https://issues.apache.org/jira/browse/AMQ-3127
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.4.2
>            Reporter: Stirling Chow
>            Priority: Critical
>         Attachments: AMQ-3127.diff, BridgeDeadlockTest.java
>
>
> Symptom
> =======
> We have an AMQ 5.3.1 production environment with 7 brokers networked over HTTP using the DiscoveryNetworkConnector and SimpleDiscoveryAgent.  The brokers share a number of topics and queues.  Periodically, we have a catastrophic (cause still uknown) network outage that only affects the outbound bridges from one of the 7 brokers.  The affected broker detects the outage, stops the existing 6 outbound bridges, and starts 6 new outbound bridges.  Frequently, we find that the outbound bridges appear to be recreated properly, but messages produced by the affected broker to *some* of its shared queues/topics are no longer dispatched to the remote brokers.
> We have verified that the cause of this issue exists in AMQ 5.4.2.
> Cause
> =====
> Analysis of the affected broker's threads revealed a deadlock between one of the BrokerService threads, which was dispatching a message across an outbound bridge, and a transport thread (e.g., VMTransport or HTTP Reader) that was receiving a new subscriptions from the outbound bridge:
> Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: Object  (id=104)	
> 	owns: Object  (id=105)	
> 	owns: Object  (id=106)	
> 	owns: Queue$3  (id=107)	
> 	waiting for: Object  (id=108)	
> 		owned by: Daemon Thread [VMTransport] (Running)	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	DemandForwardingBridge(DemandForwardingBridgeSupport).serviceLocalCommand(Command) line: 738	
> 	DemandForwardingBridgeSupport$2.onCommand(Object) line: 161	
> 	ResponseCorrelator.onCommand(Object) line: 116	
> 	MutexTransport(TransportFilter).onCommand(Object) line: 69	
> 	VMTransport.dispatch(VMTransport, TransportListener, Object) line: 122	
> 	VMTransport.oneway(Object) line: 113	
> 	MutexTransport.oneway(Object) line: 40	
> 	ResponseCorrelator.oneway(Object) line: 60	
> 	ManagedTransportConnection(TransportConnection).dispatch(Command) line: 1249	
> 	ManagedTransportConnection(TransportConnection).processDispatch(Command) line: 810	
> 	ManagedTransportConnection(TransportConnection).dispatchSync(Command) line: 770	
> 	QueueSubscription(PrefetchSubscription).dispatch(MessageReference) line: 649	
> 	QueueSubscription(PrefetchSubscription).dispatchPending() line: 599	
> 	QueueSubscription(PrefetchSubscription).add(MessageReference) line: 156	
> 	Queue.doActualDispatch(List<QueueMessageReference>) line: 1798	
> 	Queue.doDispatch(List<QueueMessageReference>) line: 1745	
> 	Queue.pageInMessages(boolean) line: 1898	
> 	Queue.iterate() line: 1425	
> 	PooledTaskRunner.runTask() line: 122	
> 	PooledTaskRunner$1.run() line: 43	
> 	ThreadPoolExecutor$Worker.runTask(Runnable) line: 886	
> 	ThreadPoolExecutor$Worker.run() line: 908	
> 	Thread.run() line: 662	
> Daemon Thread [VMTransport] (Suspended)	
> 	owns: Object  (id=499)	
> 	owns: RegionBroker$1  (id=205)	
> 		waited by: Daemon Thread [VMTransport] (Running)	
> 		waited by: Daemon Thread [ActiveMQ Broker[broker1] Scheduler] (Running)	
> 	owns: Object  (id=108)	
> 		waited by: Daemon Thread [BrokerService[broker1] Task] (Suspended)	
> 	owns: URI  (id=500)	
> 	Unsafe.park(boolean, long) line: not available [native method]	
> 	LockSupport.park(Object) line: 158	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).parkAndCheckInterrupt() line: 811	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquireQueued(AbstractQueuedSynchronizer$Node, int) line: 842	
> 	ReentrantReadWriteLock$NonfairSync(AbstractQueuedSynchronizer).acquire(int) line: 1178	
> 	ReentrantReadWriteLock$WriteLock.lock() line: 807	
> 	Queue.addSubscription(ConnectionContext, Subscription) line: 360	
> 	ManagedQueueRegion(AbstractRegion).addConsumer(ConnectionContext, ConsumerInfo) line: 290	
> 	ManagedRegionBroker(RegionBroker).addConsumer(ConnectionContext, ConsumerInfo) line: 444	
> 	ManagedRegionBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 240	
> 	AdvisoryBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	AdvisoryBroker.addConsumer(ConnectionContext, ConsumerInfo) line: 91	
> 	CompositeDestinationBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	TransactionBroker(BrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 89	
> 	BrokerService$3(MutableBrokerFilter).addConsumer(ConnectionContext, ConsumerInfo) line: 95	
> 	ManagedTransportConnection(TransportConnection).processAddConsumer(ConsumerInfo) line: 550	
> 	ConsumerInfo.visit(CommandVisitor) line: 349	
> 	
> Specifically, a message had been produced to one of the shared queues and was being dispatched to a remote consumer by the BrokerService thread.  In so doing, BrokerService had acquired the pagedInPendingDispatchLock lock from Queue.java:
>     private void doDispatch(List<QueueMessageReference> list) throws Exception {
>         boolean doWakeUp = false;
>         pagedInPendingDispatchLock.writeLock().lock();
> 	
> BrokerService had sent the message to the remote broker was then acknowledging the local transport in DemandForwardingBridgeSupport.java:
>     protected void serviceLocalCommand(Command command) {
>     ...
>                         if (!message.isResponseRequired()) {
>                             
>                             // If the message was originally sent using async
>                             // send, we will preserve that QOS
>                             // by bridging it using an async send (small chance
>                             // of message loss).
>                             try {
>                                 remoteBroker.oneway(message);
>                                 localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
> Since localBroker was a synchronous VMTransport, BrokerService had to then acquire the write mutex in MutexTransport.java:
>     public void oneway(Object command) throws IOException {
>         synchronized (writeMutex) {
>             next.oneway(command);
>         }
>     }
> So the dispatching thread (BrokerService) had acquired Queue.pagedInPendingDispatchLock was trying to acquire MutexTransport.writeMutex.
> At the same time, a new remote consumer was being registered through the same outbound bridge through which the aforementioned dispatch was ocurring.  The bridge's remote transport listener thread (in this example, VMTransport) was adding the subscription through DemandForwardingBridgeSupport.java:
>     protected void addSubscription(DemandSubscription sub) throws IOException {
>         if (sub != null) {
>             localBroker.oneway(sub.getLocalInfo());
>         }
>     }
> Again, localBroker is synchronous, so the VMTransport thread acquired MutexTransport.writeMutex.  Registration of consumers to a queue is synchronized with the dispatching of messages, as shown in Queue.java:
>     public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
>         super.addSubscription(context, sub);
>         // synchronize with dispatch method so that no new messages are sent
>         // while setting up a subscription. avoid out of order messages,
>         // duplicates, etc.
>         pagedInPendingDispatchLock.writeLock().lock();
> So the remote transport listening thread (VMTransport) had acquired MutexTransport.writeMutex and was trying to acquire Queue.pagedInPendingDispatchLock, thus creating a deadlock with BrokerService.
> Solution
> ======
> Deadlock can be avoided by making the local transport asynchronous, which would allow the remote transport listener thread to acquire the MutexTransport.writeMutex, but then offload the acquisition of Queue.pagedInPendingDispatchLock to its peer listening thread.  We've included a unit test that passes with this change.
> There is no clear reason why the local transport is asynchronous.  This is enforced by BrokerService.java when it starts the network connectors:
>     protected void startAllConnectors() throws Exception {
> ....
>             URI uri = getVmConnectorURI();
>             Map<String, String> map = new HashMap<String, String>(URISupport.parseParameters(uri));
>             map.put("network", "true");
>             map.put("async", "false");
> This change was made by the following checkin, but no rational was given:
> Revision: 553094
> Author: rajdavies
> Date: 11:33:48 PM, July 3, 2007
> Message:
> set async=false for network connectors
> ----
> Modified : /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
> Addendum
> =========
> We've included a unit test that demonstrates the deadlock 100% of the time on our systems.  Since this is a timing issue, you may need to run the unit test several times to create the deadlock.  Also note that two specific configurations must exist to create the deadlock:
> 1) The bridge must have conduit subscriptions disabled; this is so that there can be an existing subscription across the bridge to which messages are being dispatched while at the same time another subscription is being added.
> 2) The message producers must be transactionalized; this is so that the message dispatches require a response by the dispatch thread (i.e., BrokerService).
> If either of these conditions is not present, deadlock (at least through this recreation) does not occur.	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> 	
> Through further testing 

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.