You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/29 19:27:00 UTC

svn commit: r410123 - in /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq: network/DemandForwardingBridgeSupport.java transport/failover/FailoverTransport.java

Author: chirino
Date: Mon May 29 10:27:00 2006
New Revision: 410123

URL: http://svn.apache.org/viewvc?rev=410123&view=rev
Log:
Fix for: http://issues.apache.org/activemq/browse/AMQ-726

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon May 29 10:27:00 2006
@@ -101,6 +101,7 @@
     protected boolean decreaseNetworkConsumerPriority;
     protected boolean shutDown;
     protected int networkTTL = 1;
+    protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
 
     
     public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@@ -130,28 +131,49 @@
     
             public synchronized void transportInterupted(){
                 //clear any subscriptions - to try and prevent the bridge from stalling the broker
-                log.warn("Outbound transport to " + remoteBrokerName +  " interrupted ...");
-                clearDownSubscriptions();
-                doStopLocal();
-                startedLatch = new CountDownLatch(2);
-                try{
-                    triggerLocalStartBridge();
-                }catch(IOException e){
-                    log.warn("Caught exception from local start",e);
+                if( remoteInterupted.compareAndSet(false, true) ) {
+                    log.warn("Outbound transport to " + remoteBrokerName +  " interrupted ...");
+                    clearDownSubscriptions();
+                    try{
+                        localBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+                    }catch(IOException e){
+                        log.warn("Caught exception from local start",e);
+                    }
+                    localBridgeStarted.set(false);
+                    remoteBridgeStarted.set(false);
+                    startedLatch = new CountDownLatch(2);
                 }
                 
             }
     
             public synchronized void transportResumed(){
-                //restart and static subscriptions - the consumer advisories will be replayed
-                log.info("Outbound transport to " + remoteBrokerName + " resumed");
-                setupStaticDestinations();
-                startedLatch.countDown();
+                
+                if( remoteInterupted.compareAndSet(true, false) ) {
+                    
+                    //restart and static subscriptions - the consumer advisories will be replayed
+                    log.info("Outbound transport to " + remoteBrokerName + " resumed");
+                    
+//                    try{
+//                        triggerLocalStartBridge();
+//                    }catch(IOException e){
+//                        log.warn("Caught exception from local start",e);
+//                    }
+    
+                    try{
+                        // clear out the previous connection as it may have missed some consumer advisories.
+                        remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+                        triggerRemoteStartBridge();
+                    }catch(IOException e){
+                        log.warn("Caught exception from remote start",e);
+                    }
+                    
+                }
                 
             }
         });
         localBroker.start();
         remoteBroker.start();
+//        triggerLocalStartBridge();
         triggerRemoteStartBridge();
     }
 
@@ -160,7 +182,7 @@
             public void run(){
                 try{
                     startLocalBridge();
-                }catch(IOException e){
+                }catch(Exception e){
                     log.error("Failed to start network bridge: "+e,e);
                 }
             }
@@ -173,7 +195,7 @@
             public void run(){
                 try{
                     startRemoteBridge();
-                }catch(IOException e){
+                }catch(Exception e){
                     log.error("Failed to start network bridge: "+e,e);
                 }
             }
@@ -181,8 +203,9 @@
         thead.start();
     }
 
-    protected void startLocalBridge() throws IOException {
+    protected void startLocalBridge() throws Exception {
         if(localBridgeStarted.compareAndSet(false,true)){
+            
             localConnectionInfo=new ConnectionInfo();
             localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
             localClientId="NC_"+remoteBrokerName+"_inbound"+name;
@@ -201,7 +224,7 @@
         }
     }
 
-    protected void startRemoteBridge() throws IOException {
+    protected void startRemoteBridge() throws Exception {
         if(remoteBridgeStarted.compareAndSet(false,true)){
 
             remoteConnectionInfo=new ConnectionInfo();
@@ -229,7 +252,7 @@
                             +destinationFilter));
             demandConsumerInfo.setPrefetchSize(prefetchSize);
             remoteBroker.oneway(demandConsumerInfo);
-            //we want infomation about Destinations as well
+            //we want information about Destinations as well
             ConsumerInfo destinationInfo  = new ConsumerInfo(remoteSessionInfo,2);
             destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
             destinationInfo.setPrefetchSize(prefetchSize);
@@ -290,6 +313,7 @@
         }finally{
             ServiceStopper ss=new ServiceStopper();
             ss.stop(localBroker);
+            localBridgeStarted.set(false);
         }
     }
 
@@ -489,9 +513,13 @@
                     serviceLocalBrokerInfo(command);
                 }else if(command.isShutdownInfo()){
                     log.info(localBrokerName+" Shutting down");
-                    shutDown = true;
-                    doStop();
-                   
+                    // Don't shut down the whole connector if the remote side was interrupted.
+                    // the local transport is just shutting down temporarily until the remote side
+                    // is restored.
+                    if( !remoteInterupted.get() ) { 
+                        shutDown = true;
+                        doStop();
+                    }
                     
                 }else{
                     switch(command.getDataStructureType()){

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon May 29 10:27:00 2006
@@ -229,8 +229,8 @@
                 ServiceSupport.dispose(connectedTransport);
                 connectedTransport = null;
                 connectedTransportURI = null;
-                reconnectTask.wakeup();
             }
+            reconnectTask.wakeup();
         }
     }