You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/05/29 19:27:00 UTC
svn commit: r410123 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq:
network/DemandForwardingBridgeSupport.java
transport/failover/FailoverTransport.java
Author: chirino
Date: Mon May 29 10:27:00 2006
New Revision: 410123
URL: http://svn.apache.org/viewvc?rev=410123&view=rev
Log:
Fix for: http://issues.apache.org/activemq/browse/AMQ-726
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon May 29 10:27:00 2006
@@ -101,6 +101,7 @@
protected boolean decreaseNetworkConsumerPriority;
protected boolean shutDown;
protected int networkTTL = 1;
+ protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@@ -130,28 +131,49 @@
public synchronized void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker
- log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
- clearDownSubscriptions();
- doStopLocal();
- startedLatch = new CountDownLatch(2);
- try{
- triggerLocalStartBridge();
- }catch(IOException e){
- log.warn("Caught exception from local start",e);
+ if( remoteInterupted.compareAndSet(false, true) ) {
+ log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
+ clearDownSubscriptions();
+ try{
+ localBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+ }catch(IOException e){
+ log.warn("Caught exception from local start",e);
+ }
+ localBridgeStarted.set(false);
+ remoteBridgeStarted.set(false);
+ startedLatch = new CountDownLatch(2);
}
}
public synchronized void transportResumed(){
- //restart and static subscriptions - the consumer advisories will be replayed
- log.info("Outbound transport to " + remoteBrokerName + " resumed");
- setupStaticDestinations();
- startedLatch.countDown();
+
+ if( remoteInterupted.compareAndSet(true, false) ) {
+
+ //restart and static subscriptions - the consumer advisories will be replayed
+ log.info("Outbound transport to " + remoteBrokerName + " resumed");
+
+// try{
+// triggerLocalStartBridge();
+// }catch(IOException e){
+// log.warn("Caught exception from local start",e);
+// }
+
+ try{
+ // clear out the previous connection as it may have missed some consumer advisories.
+ remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+ triggerRemoteStartBridge();
+ }catch(IOException e){
+ log.warn("Caught exception from remote start",e);
+ }
+
+ }
}
});
localBroker.start();
remoteBroker.start();
+// triggerLocalStartBridge();
triggerRemoteStartBridge();
}
@@ -160,7 +182,7 @@
public void run(){
try{
startLocalBridge();
- }catch(IOException e){
+ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@@ -173,7 +195,7 @@
public void run(){
try{
startRemoteBridge();
- }catch(IOException e){
+ }catch(Exception e){
log.error("Failed to start network bridge: "+e,e);
}
}
@@ -181,8 +203,9 @@
thead.start();
}
- protected void startLocalBridge() throws IOException {
+ protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){
+
localConnectionInfo=new ConnectionInfo();
localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
localClientId="NC_"+remoteBrokerName+"_inbound"+name;
@@ -201,7 +224,7 @@
}
}
- protected void startRemoteBridge() throws IOException {
+ protected void startRemoteBridge() throws Exception {
if(remoteBridgeStarted.compareAndSet(false,true)){
remoteConnectionInfo=new ConnectionInfo();
@@ -229,7 +252,7 @@
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
- //we want infomation about Destinations as well
+ //we want information about Destinations as well
ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
destinationInfo.setPrefetchSize(prefetchSize);
@@ -290,6 +313,7 @@
}finally{
ServiceStopper ss=new ServiceStopper();
ss.stop(localBroker);
+ localBridgeStarted.set(false);
}
}
@@ -489,9 +513,13 @@
serviceLocalBrokerInfo(command);
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
- shutDown = true;
- doStop();
-
+ // Don't shut down the whole connector if the remote side was interrupted.
+ // the local transport is just shutting down temporarily until the remote side
+ // is restored.
+ if( !remoteInterupted.get() ) {
+ shutDown = true;
+ doStop();
+ }
}else{
switch(command.getDataStructureType()){
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=410123&r1=410122&r2=410123&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Mon May 29 10:27:00 2006
@@ -229,8 +229,8 @@
ServiceSupport.dispose(connectedTransport);
connectedTransport = null;
connectedTransportURI = null;
- reconnectTask.wakeup();
}
+ reconnectTask.wakeup();
}
}