You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by "Timothy Bish (JIRA)" <ji...@apache.org> on 2012/11/05 16:08:13 UTC

[jira] [Commented] (AMQ-4159) Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock

    [ https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13490677#comment-13490677 ] 

Timothy Bish commented on AMQ-4159:
-----------------------------------

This seems sensible.  Nice catch.
                
> Race condition in SimpleDiscoveryAgent creates multiple concurrent threads attempting to connect to the same bridge --- can result in deadlock
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4159
>                 URL: https://issues.apache.org/jira/browse/AMQ-4159
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Stirling Chow
>            Assignee: Timothy Bish
>            Priority: Critical
>         Attachments: AMQ4159.patch, AMQ4159Test.java
>
>
> Symptom
> =======
> I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed that during one of the tests, concurrent calls were being made to {{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same {{DiscoveryEvent}}.  This was unexpected because only a single service (URL) had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I observed dozens of concurrent calls.
> Concurrent attempts to establish a bridge to the *same* remote broker are problematic because they expose a number of race conditions in {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent bridge failure (see AMQ-4160), as well as unnecessary thread pool execution/resource usage and logging.
> The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be filed as separate issues.  This issue specifically addresses the bug that causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection attempts.
> Cause
> =====
> When {{DemandForwardingBridgeSupport}} handles exceptions from either the local or remote sides of the the bridge, it fires a "bridge failed" event:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceLocalException(Throwable error) {
>     if (!disposed.get()) {
>         LOG.info("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a local error: " + error);
>         LOG.debug("The local Exception was:" + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
>             LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
>         }
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> private void fireBridgeFailed() {
>     NetworkBridgeListener l = this.networkBridgeListener;
>     if (l != null) {
>         l.bridgeFailed();
>     }
> }
> {code}
> {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its {{bridgeFailed()}} method calls back to {{SimpleDiscoveryAgent.serviceFailed(...)}}:
> {code:title=DiscoveryNetworkConnectol.java}
> protected NetworkBridge createBridge(Transport localTransport, Transport remoteTransport, final DiscoveryEvent event) {
>     class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
>         public DiscoverNetworkBridgeListener(BrokerService brokerService, ObjectName connectorName) {
>             super(brokerService, connectorName);
>         }
>         public void bridgeFailed() {
>             if (!serviceSupport.isStopped()) {
>                 try {
>                     discoveryAgent.serviceFailed(event);
>                 } catch (IOException e) {
>                 }
>             }
>         }
>     }
> ...
> {code}
> In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the {{reconnectDelay}} before attempting to re-establish the bridge via {{DiscoveryNetworkConnector.onServiceAdd(...)}}:
> {code:title=SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 // We detect a failed connection attempt because the service
>                 // fails right
>                 // away.
>                 if (event.connectTime + minConnectTime > System.currentTimeMillis()) {
>                     LOG.debug("Failure occurred soon after the discovery event was generated.  It will be classified as a connection failure: "+event);
> ...
>                     synchronized (sleepMutex) {
>                         try {
>                             if (!running.get()) {
>                                 LOG.debug("Reconnecting disabled: stopped");
>                                 return;
>                             }
>                             LOG.debug("Waiting "+event.reconnectDelay+" ms before attempting to reconnect.");
>                             sleepMutex.wait(event.reconnectDelay);
>                         } catch (InterruptedException ie) {
>                             LOG.debug("Reconnecting disabled: " + ie);
>                             Thread.currentThread().interrupt();
>                             return;
>                         }
>                     }
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code}
> *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
> There are two race conditions that allow {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread, each attempting to re-restablish the same bridge.
> First, note that {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches a separate thread that stops the bridge:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof GeneralSecurityException) {
>             LOG.error("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + remoteBroker + " shutdown due to a remote error: " + error);
>         } 
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void stop() throws Exception {
>     if (started.compareAndSet(true, false)) {
>         if (disposed.compareAndSet(false, true)) {
>             LOG.debug(" stopping " + configuration.getBrokerName() + " bridge to " + remoteBrokerName);
>             NetworkBridgeListener l = this.networkBridgeListener;
>             if (l != null) {
>                 l.onStop(this);
>             }
> {code}
> When the bridge stops, the {{disposed}} flag is set, which prevents subsequent calls to {{serviceLocal/RemoteException(...)}} from calling {{fireBridgeFailed()}}.  However, since the call to {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple {{serviceLocal/RemoteException(...)}} calls that are made in quick succession can result in multiple calls to {{fireBridgeFailed()}}.
> This is the first race condition: multiple calls can be made to {{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge.  By transitivity, this results in multiple calls to {{SimpleDiscoveryAgent.serviceFailed(...)}}.
> {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class, {{event.failed.compareAndSet(false, true)}}, which should only allow the first call to launch a bridge reconnect thread.  However, once the {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the failure handling logic, and the possibile launching of additional bridge reconnect threads if the {{reconnectDelay}} is short or the threads calling {{serviceFailed(...)}} are delayed.
> This is the second race condition: the guard clause in {{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent redundant calls have been filtered out.
> These two race conditions allow a single call to {{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple subsequent concurrent (re)calls, and these concurrent calls can spawn their own multiple concurrent calls.  The result can be an unlimited number of concurrent calls to {{onServiceAdd(...)}}.
> Unit Test
> =========
> The attached unit test demonstrates this bug by simulating a bridge failure that has yet to be detected by the remote broker (i.e., before the {{InactivityMonitor}} closes the connection).  The local broker attempts to re-establish the bridge, but its call to {{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker rejects the new connection since the old one still exists.  Since {{startRemoteBroker}} sends multiple messages to the remote broker, multiple exceptions are generated:
> {code:title=DemandForwardingBridgeSupport.java}
> protected void startRemoteBridge() throws Exception {
> ...
>                 remoteBroker.oneway(brokerInfo);
> ...
>             remoteBroker.oneway(remoteConnectionInfo);
> ...
>             remoteBroker.oneway(producerInfo);
> ...
>                 remoteBroker.oneway(demandConsumerInfo);
> }
> {code}
> The multiple exceptions result in multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the first race condition to be exhibited.  
> The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make the second race condition improbable; therefore, this test generally passes. 
> The second unit test has a 0s {[reconnectDelay}; on my system, this makes the timing of multiple calls to {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the second race condition is reliably exhibited, resulting in the unit test failing because it detects concurrent calls to {{DiscoveryNetworkConnector.onServiceAdd(...)}}.
> Solution
> ========
> While it would be possible to add a {{failed.compareAndSet(false,true)}} guard clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the first race condition from allowing multiple calls to {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race condition in {{serviceFailed}}.  This can be trivially addressed by making a copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} guard clause from being reset:
> {code:title=Patched SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code} 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira