You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:45:09 UTC

svn commit: r420722 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network: DemandForwardingBridge.java DemandForwardingBridgeSupport.java DiscoveryNetworkConnector.java

Author: chirino
Date: Mon Jul 10 21:45:09 2006
New Revision: 420722

URL: http://svn.apache.org/viewvc?rev=420722&view=rev
Log:
Fixed Network Connection failure recovery.
http://issues.apache.org/activemq/browse/AMQ-802
http://issues.apache.org/activemq/browse/AMQ-805

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Mon Jul 10 21:45:09 2006
@@ -53,9 +53,6 @@
                     ServiceSupport.dispose(this);
                 }
             }
-            if (!disposed){
-                triggerLocalStartBridge();
-            }
         }
     }
 

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Jul 10 21:45:09 2006
@@ -70,14 +70,14 @@
     protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
     protected final Transport localBroker;
     protected final Transport remoteBroker;
-    protected IdGenerator idGenerator = new IdGenerator();
-    protected LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+    protected final IdGenerator idGenerator = new IdGenerator();
+    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
     protected ConnectionInfo localConnectionInfo;
     protected ConnectionInfo remoteConnectionInfo;
     protected SessionInfo localSessionInfo;
     protected ProducerInfo producerInfo;
-    protected String localBrokerName;
-    protected String remoteBrokerName;
+    protected String localBrokerName = "Unknown";
+    protected String remoteBrokerName = "Unknown";
     protected String localClientId;
     protected String userName;
     protected String password;
@@ -87,22 +87,22 @@
     protected String name = "bridge";
     protected ConsumerInfo demandConsumerInfo;
     protected int demandConsumerDispatched;
-    protected AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
-    protected AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
+    protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
+    protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
     protected boolean disposed = false;
     protected BrokerId localBrokerId;
     protected ActiveMQDestination[] excludedDestinations;
     protected ActiveMQDestination[] dynamicallyIncludedDestinations;
     protected ActiveMQDestination[] staticallyIncludedDestinations;
     protected ActiveMQDestination[] durableDestinations;
-    protected ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
-    protected ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
+    protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
+    protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
     protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
     protected CountDownLatch startedLatch = new CountDownLatch(2);
     protected boolean decreaseNetworkConsumerPriority;
-    protected boolean shutDown;
     protected int networkTTL = 1;
     protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
+    protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
 
     
     public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@@ -111,7 +111,6 @@
     }
 
     public void start() throws Exception {
-        log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
         localBroker.setTransportListener(new DefaultTransportListener(){
             public void onCommand(Command command){
                 serviceLocalCommand(command);
@@ -130,16 +129,23 @@
                 serviceRemoteException(error);
             }
     
-            public synchronized void transportInterupted(){
+            public void transportInterupted(){
                 //clear any subscriptions - to try and prevent the bridge from stalling the broker
                 if( remoteInterupted.compareAndSet(false, true) ) {
-                    log.warn("Outbound transport to " + remoteBrokerName +  " interrupted ...");
-                    clearDownSubscriptions();
-                    try{
-                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
-                    }catch(IOException e){
-                        log.warn("Caught exception from local start",e);
-                    }
+                	
+                    log.debug("Outbound transport to " + remoteBrokerName + " interrupted.");                                        
+
+                	if( localBridgeStarted.get() ) {
+	                    clearDownSubscriptions();
+	                    synchronized( DemandForwardingBridgeSupport.this ) {
+		                    try{
+		                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
+		                    }catch(IOException e){
+		                        log.warn("Caught exception from local start",e);
+		                    }
+	                    }
+                	}
+                	
                     localBridgeStarted.set(false);
                     remoteBridgeStarted.set(false);
                     startedLatch = new CountDownLatch(2);
@@ -147,35 +153,33 @@
                 
             }
     
-            public synchronized void transportResumed(){
-                
+            public void transportResumed(){                
                 if( remoteInterupted.compareAndSet(true, false) ) {
-                    
-                    //restart and static subscriptions - the consumer advisories will be replayed
-                    log.info("Outbound transport to " + remoteBrokerName + " resumed");
-                    
-//                    try{
-//                        triggerLocalStartBridge();
-//                    }catch(IOException e){
-//                        log.warn("Caught exception from local start",e);
-//                    }
-    
-                    try{
-                        // clear out the previous connection as it may have missed some consumer advisories.
-                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
-                        triggerRemoteStartBridge();
-                    }catch(IOException e){
-                        log.warn("Caught exception from remote start",e);
-                    }
-                    
+                	
+                	// We want to slow down false connects so that we don't get in a busy loop.
+                	// False connects can occurr if you using SSH tunnels.
+                	if( !lastConnectSucceeded.get() ) {
+                		try {
+                            log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");                                        
+							Thread.sleep(1000);
+						} catch (InterruptedException e) {
+							Thread.currentThread().interrupt();
+						}
+                	}
+                    lastConnectSucceeded.set(false);
+
+                    log.debug("Outbound transport to " + remoteBrokerName + " resumed");                                        
                 }
-                
             }
         });
         localBroker.start();
         remoteBroker.start();
-//        triggerLocalStartBridge();
-        triggerRemoteStartBridge();
+        
+        try{                    	
+            triggerRemoteStartBridge();
+        }catch(IOException e){
+            log.warn("Caught exception from remote start",e);
+        }
     }
 
     protected void triggerLocalStartBridge() throws IOException {
@@ -184,7 +188,7 @@
                 try{
                     startLocalBridge();
                 }catch(Exception e){
-                    log.error("Failed to start network bridge: "+e,e);
+                    serviceLocalException(e);
                 }
             }
         };
@@ -197,7 +201,7 @@
                 try{
                     startRemoteBridge();
                 }catch(Exception e){
-                    log.error("Failed to start network bridge: "+e,e);
+                    serviceRemoteException(e);
                 }
             }
         };
@@ -206,121 +210,109 @@
 
     protected void startLocalBridge() throws Exception {
         if(localBridgeStarted.compareAndSet(false,true)){
-            
-            localConnectionInfo=new ConnectionInfo();
-            localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-            localClientId="NC_"+remoteBrokerName+"_inbound"+name;
-            localConnectionInfo.setClientId(localClientId);
-            localConnectionInfo.setUserName(userName);
-            localConnectionInfo.setPassword(password);
-            localBroker.oneway(localConnectionInfo);
-
-            localSessionInfo=new SessionInfo(localConnectionInfo,1);
-            localBroker.oneway(localSessionInfo);
-            
-            log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
-                            +") has been established.");
-            startedLatch.countDown();
-            setupStaticDestinations();
+            synchronized( this ) {
+	            localConnectionInfo=new ConnectionInfo();
+	            localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+	            localClientId="NC_"+remoteBrokerName+"_inbound"+name;
+	            localConnectionInfo.setClientId(localClientId);
+	            localConnectionInfo.setUserName(userName);
+	            localConnectionInfo.setPassword(password);
+	            localBroker.oneway(localConnectionInfo);
+	
+	            localSessionInfo=new SessionInfo(localConnectionInfo,1);
+	            localBroker.oneway(localSessionInfo);
+	            
+	            log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+	                            +") has been established.");
+	            
+	            startedLatch.countDown();
+	            setupStaticDestinations();
+            }
         }
     }
 
     protected void startRemoteBridge() throws Exception {
-        if(remoteBridgeStarted.compareAndSet(false,true)){
-
-            remoteConnectionInfo=new ConnectionInfo();
-            remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
-            remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
-            remoteConnectionInfo.setUserName(userName);
-            remoteConnectionInfo.setPassword(password);
-            remoteBroker.oneway(remoteConnectionInfo);
-
-            BrokerInfo brokerInfo=new BrokerInfo();
-            brokerInfo.setBrokerName(localBrokerName);
-            remoteBroker.oneway(brokerInfo);
-
-            SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
-            remoteBroker.oneway(remoteSessionInfo);
-
-            producerInfo=new ProducerInfo(remoteSessionInfo,1);
-            producerInfo.setResponseRequired(false);
-            remoteBroker.oneway(producerInfo);
-
-            // Listen to consumer advisory messages on the remote broker to determine demand.
-            demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
-            demandConsumerInfo.setDispatchAsync(dispatchAsync);
-            demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
-                            +destinationFilter));
-            demandConsumerInfo.setPrefetchSize(prefetchSize);
-            remoteBroker.oneway(demandConsumerInfo);
-            //we want information about Destinations as well
-            ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
-            destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
-            destinationInfo.setPrefetchSize(prefetchSize);
-            remoteBroker.oneway(destinationInfo);
-            startedLatch.countDown();
+        if(remoteBridgeStarted.compareAndSet(false,true)) {
+    
+        	synchronized (this) {
+        		
+            	if( remoteConnectionInfo!=null ) {
+            		remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+            	}
+            	
+                remoteConnectionInfo=new ConnectionInfo();
+                remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+                remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
+                remoteConnectionInfo.setUserName(userName);
+                remoteConnectionInfo.setPassword(password);
+                remoteBroker.oneway(remoteConnectionInfo);
+
+                BrokerInfo brokerInfo=new BrokerInfo();
+                brokerInfo.setBrokerName(localBrokerName);
+                remoteBroker.oneway(brokerInfo);
+
+                SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
+                remoteBroker.oneway(remoteSessionInfo);
+
+                producerInfo=new ProducerInfo(remoteSessionInfo,1);
+                producerInfo.setResponseRequired(false);
+                remoteBroker.oneway(producerInfo);
+
+                // Listen to consumer advisory messages on the remote broker to determine demand.
+                demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
+                demandConsumerInfo.setDispatchAsync(dispatchAsync);
+                demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+                                +destinationFilter));
+                demandConsumerInfo.setPrefetchSize(prefetchSize);
+                remoteBroker.oneway(demandConsumerInfo);
+                //we want information about Destinations as well
+                ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
+                destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+                destinationInfo.setPrefetchSize(prefetchSize);
+                remoteBroker.oneway(destinationInfo);
+                startedLatch.countDown();
+                
+                if (!disposed){
+                    triggerLocalStartBridge();
+                }
+        		
+        	}
         }
     }
 
     public void stop() throws Exception {
-        shutDown = true;
-        doStop();
-    }
-
-    /**
-     * stop the bridge
-     * @throws Exception 
-     */
-    protected void doStop() throws Exception {
         log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed);
-        if(!disposed){
-            try{
-                disposed=true;
-                
-                remoteBridgeStarted.set(false);
-                if(!shutDown){
-                   remoteBroker.oneway(new ShutdownInfo());
-                    if(localConnectionInfo!=null){
-                        localBroker.oneway(localConnectionInfo.createRemoveCommand());
-                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
-                    }
-                    localBroker.oneway(new ShutdownInfo());
-                }
-                localBroker.setTransportListener(null);
-                remoteBroker.setTransportListener(null);
-            }catch(IOException e){
-                log.debug("Caught exception stopping",e);
-            }finally{
-                ServiceStopper ss=new ServiceStopper();
-                ss.stop(localBroker);
-                ss.stop(remoteBroker);
-                ss.throwFirstException();
-            }
-        }
+        if (!disposed) {
+			try {
+				disposed = true;
+
+				remoteBridgeStarted.set(false);
+				
+				localBroker.oneway(new ShutdownInfo());
+				remoteBroker.oneway(new ShutdownInfo());
+				
+			} catch (IOException e) {
+				log.debug("Caught exception stopping", e);
+			} finally {
+				ServiceStopper ss = new ServiceStopper();
+				ss.stop(localBroker);
+				ss.stop(remoteBroker);
+				ss.throwFirstException();
+			}
+		}
         log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
     }
     
-    protected void doStopLocal(){
-        try{
-            if(!shutDown){
-                if(localConnectionInfo!=null){
-                    localBroker.oneway(localConnectionInfo.createRemoveCommand());
-                }
-                localBroker.oneway(new ShutdownInfo());
-            }
-            localBroker.setTransportListener(null);
-        }catch(IOException e){
-            log.debug("Caught exception stopping",e);
-        }finally{
-            ServiceStopper ss=new ServiceStopper();
-            ss.stop(localBroker);
-            localBridgeStarted.set(false);
-        }
-    }
-
     protected void serviceRemoteException(Throwable error) {
-        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
-        ServiceSupport.dispose(this);
+    	if( !disposed ) {
+	        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a remote error: "+error);
+	        log.debug("The remote Exception was: "+error, error);
+	        new Thread() {
+	        	public void run() {
+	                ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+	        	}
+	        }.start();
+    	}
     }
 
     protected void serviceRemoteCommand(Command command) {
@@ -336,7 +328,10 @@
                         demandConsumerDispatched=0;
                     }
                 }else if(command.isBrokerInfo()){
-                    serviceRemoteBrokerInfo(command);
+                	
+                	lastConnectSucceeded.set(true);
+                	serviceRemoteBrokerInfo(command);
+                    
                 }else if(command.getClass() == ConnectionError.class ) {
                 	ConnectionError ce = (ConnectionError) command;
                 	serviceRemoteException(ce.getException());
@@ -344,6 +339,7 @@
                     switch(command.getDataStructureType()){
                     case KeepAliveInfo.DATA_STRUCTURE_TYPE:
                     case WireFormatInfo.DATA_STRUCTURE_TYPE:
+                    case ShutdownInfo.DATA_STRUCTURE_TYPE:
                         break;
                     default:
                         log.warn("Unexpected remote command: "+command);
@@ -413,7 +409,10 @@
                 ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
                 tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
             }
+                        
             destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+            
+            log.debug("Replying destination control command: "+destInfo);            
             localBroker.oneway(destInfo);
             
         }
@@ -424,8 +423,15 @@
     }
 
     protected void serviceLocalException(Throwable error) {
-        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
-        ServiceSupport.dispose(this);
+    	if( !disposed ) {
+	        log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
+	        log.debug("The local Exception was:"+error,error);
+	        new Thread() {
+	        	public void run() {
+	                ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+	        	}
+	        }.start();
+    	}
     }
 
     protected void addSubscription(DemandSubscription sub) throws IOException {
@@ -502,16 +508,6 @@
                             remoteBroker.asyncRequest(message, callback);
                         }
                         
-                      // Ack on every message since we don't know if the broker is blocked due to memory
-                      // usage and is waiting for an Ack to un-block him. 
-                       
-                      // Acking a range is more efficient, but also more prone to locking up a server
-                      // Perhaps doing something like the following should be policy based.
-//                        int dispatched = sub.incrementDispatched();
-//                        if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
-//                            localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
-//                            sub.setDispatched(0);
-//                        }
                     }
                 }else if(command.isBrokerInfo()){
                     serviceLocalBrokerInfo(command);
@@ -521,8 +517,7 @@
                     // the local transport is just shutting down temporarily until the remote side
                     // is restored.
                     if( !remoteInterupted.get() ) { 
-                        shutDown = true;
-                        doStop();
+                        stop();
                     }
                 }else if(command.getClass() == ConnectionError.class ) {
                 	ConnectionError ce = (ConnectionError) command;
@@ -695,21 +690,7 @@
     public void setNetworkTTL(int networkTTL) {
         this.networkTTL=networkTTL;
     }
-
-    /**
-     * @return Returns the shutDown.
-     */
-    public boolean isShutDown() {
-        return shutDown;
-    }
-
-    /**
-     * @param shutDown The shutDown to set.
-     */
-    public void setShutDown(boolean shutDown) {
-        this.shutDown=shutDown;
-    }
-
+  
     public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
         if(brokerPath!=null){
             for(int i=0;i<brokerPath.length;i++){
@@ -843,7 +824,8 @@
     }
 
     protected void clearDownSubscriptions() {
-        
+        subscriptionMapByLocalId.clear();
+        subscriptionMapByRemoteId.clear();
     }
 
     protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Mon Jul 10 21:45:09 2006
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.network;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
 
-import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
@@ -28,10 +30,7 @@
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.util.ServiceSupport;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
 
 /**
  * A network connector which uses a discovery agent to detect the remote brokers
@@ -58,6 +57,11 @@
     }
 
     public void onServiceAdd(DiscoveryEvent event) {
+    	
+    	// Ignore events once we start stopping.
+    	if( isStopped() || isStopping() )
+    		return;
+    	
         String url = event.getServiceName();
         if (url != null) {
 
@@ -80,7 +84,7 @@
             URI connectUri = uri;
             if (failover) {
                 try {
-                    connectUri = new URI("failover:" + connectUri);
+                    connectUri = new URI("failover:(" + connectUri+")?maxReconnectDelay=1000");
                 }
                 catch (URISyntaxException e) {
                     log.warn("Could not create failover URI: " + connectUri);
@@ -90,22 +94,24 @@
 
             log.info("Establishing network connection between from " + localURI + " to " + connectUri);
 
-            Transport localTransport;
+            Transport remoteTransport;
             try {
-                localTransport = createLocalTransport();
+                remoteTransport = TransportFactory.connect(connectUri);
             }
             catch (Exception e) {
-                log.warn("Could not connect to local URI: " + localURI + ": " + e, e);
+                log.warn("Could not connect to remote URI: " + localURI + ": " + e.getMessage());
+                log.debug("Connection failure exception: "+ e, e);
                 return;
             }
 
-            Transport remoteTransport;
+            Transport localTransport;
             try {
-                remoteTransport = TransportFactory.connect(connectUri);
+                localTransport = createLocalTransport();
             }
             catch (Exception e) {
-                ServiceSupport.dispose(localTransport);
-                log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e);
+                ServiceSupport.dispose(remoteTransport);
+                log.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
+                log.debug("Connection failure exception: "+ e, e);
                 return;
             }
 
@@ -117,7 +123,13 @@
             catch (Exception e) {
                 ServiceSupport.dispose(localTransport);
                 ServiceSupport.dispose(remoteTransport);
-                log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e, e);
+                log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
+                log.debug("Start failure exception: "+ e, e);
+                
+                try {
+					discoveryAgent.serviceFailed(event);
+				} catch (IOException e1) {
+				}
                 return;
             }
         }
@@ -196,45 +208,81 @@
         if (conduitSubscriptions) {
             if (dynamicOnly) {
                 result = new ConduitBridge(localTransport, remoteTransport) {
-                    protected void serviceRemoteException(Exception error) {
-                        super.serviceRemoteException(error);
-                        try {
-                            // Notify the discovery agent that the remote broker
-                            // failed.
-                            discoveryAgent.serviceFailed(event);
-                        }
-                        catch (IOException e) {
-                        }
-                    }
+                	protected void serviceLocalException(Throwable error) {
+                		try {
+                			super.serviceLocalException(error);
+                		} finally {
+                			fireServiceFailed();
+                		}
+                	}
+                	protected void serviceRemoteException(Throwable error) {
+                		try {
+                    		super.serviceRemoteException(error);
+                		} finally {
+                			fireServiceFailed();
+                		}
+                	}
+                	public void fireServiceFailed() {
+                		if( !isStopped() ) {
+                            try {
+                                discoveryAgent.serviceFailed(event);
+                            } catch (IOException e) {
+                            }
+                		}
+                	}
                 };
             }
             else {
                 result = new DurableConduitBridge(localTransport, remoteTransport) {
-                    protected void serviceRemoteException(Exception error) {
-                        super.serviceRemoteException(error);
-                        try {
-                            // Notify the discovery agent that the remote broker
-                            // failed.
-                            discoveryAgent.serviceFailed(event);
-                        }
-                        catch (IOException e) {
-                        }
-                    }
+                	protected void serviceLocalException(Throwable error) {
+                		try {
+                			super.serviceLocalException(error);
+                		} finally {
+                			fireServiceFailed();
+                		}
+                	}
+                	protected void serviceRemoteException(Throwable error) {
+                		try {
+                    		super.serviceRemoteException(error);
+                		} finally {
+                			fireServiceFailed();
+                		}
+                	}
+                	public void fireServiceFailed() {
+                		if( !isStopped() ) {
+                            try {
+                                discoveryAgent.serviceFailed(event);
+                            } catch (IOException e) {
+                            }
+                		}
+                	}
                 };
             }
         }
         else {
-            result = new DemandForwardingBridge(localTransport, remoteTransport) {
-                protected void serviceRemoteException(Exception error) {
-                    super.serviceRemoteException(error);
-                    try {
-                        // Notify the discovery agent that the remote broker
-                        // failed.
-                        discoveryAgent.serviceFailed(event);
-                    }
-                    catch (IOException e) {
-                    }
-                }
+            result = new DemandForwardingBridge(localTransport, remoteTransport) {            	
+            	protected void serviceLocalException(Throwable error) {
+            		try {
+            			super.serviceLocalException(error);
+            		} finally {
+            			fireServiceFailed();
+            		}
+            	}
+            	protected void serviceRemoteException(Throwable error) {
+            		try {
+                		super.serviceRemoteException(error);
+            		} finally {
+            			fireServiceFailed();
+            		}
+            	}
+            	public void fireServiceFailed() {
+            		if( !isStopped() ) {
+                        try {
+                            discoveryAgent.serviceFailed(event);
+                        } catch (IOException e) {
+                        }
+            		}
+            	}
             };
         }
         return configureBridge(result);