You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2006/04/27 18:59:21 UTC

svn commit: r397589 - /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Author: rajdavies
Date: Thu Apr 27 09:59:18 2006
New Revision: 397589

URL: http://svn.apache.org/viewcvs?rev=397589&view=rev
Log:
close local transport if remote transport fails (and supports failover), and re-establish local transport on a successful re-connect

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=397589&r1=397588&r2=397589&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Apr 27 09:59:18 2006
@@ -57,6 +57,7 @@
 import org.apache.activemq.util.ServiceSupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import com.sun.tools.javac.tree.Tree.DoLoop;
 
 import java.io.IOException;
 
@@ -128,17 +129,25 @@
                 serviceRemoteException(error);
             }
     
-            public void transportInterupted(){
+            public synchronized void transportInterupted(){
                 //clear any subscriptions - to try and prevent the bridge from stalling the broker
                 log.warn("Outbound transport to " + remoteBrokerName +  " interrupted ...");
                 clearDownSubscriptions();
+                doStopLocal();
+                startedLatch = new CountDownLatch(2);
+                try{
+                    triggerLocalStartBridge();
+                }catch(IOException e){
+                    log.warn("Caught exception from local start",e);
+                }
                 
             }
     
-            public void transportResumed(){
+            public synchronized void transportResumed(){
                 //restart and static subscriptions - the consumer advisories will be replayed
                 log.info("Outbound transport to " + remoteBrokerName + " resumed");
                 setupStaticDestinations();
+                startedLatch.countDown();
                 
             }
         });
@@ -244,7 +253,7 @@
         if(!disposed){
             try{
                 disposed=true;
-                localBridgeStarted.set(false);
+                
                 remoteBridgeStarted.set(false);
                 if(!shutDown){
                    remoteBroker.oneway(new ShutdownInfo());
@@ -266,6 +275,23 @@
             }
         }
         log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
+    }
+    
+    protected void doStopLocal(){
+        try{
+            if(!shutDown){
+                if(localConnectionInfo!=null){
+                    localBroker.oneway(localConnectionInfo.createRemoveCommand());
+                }
+                localBroker.oneway(new ShutdownInfo());
+            }
+            localBroker.setTransportListener(null);
+        }catch(IOException e){
+            log.debug("Caught exception stopping",e);
+        }finally{
+            ServiceStopper ss=new ServiceStopper();
+            ss.stop(localBroker);
+        }
     }
 
     protected void serviceRemoteException(Exception error) {