You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:45:09 UTC
svn commit: r420722 - in
/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network:
DemandForwardingBridge.java DemandForwardingBridgeSupport.java
DiscoveryNetworkConnector.java
Author: chirino
Date: Mon Jul 10 21:45:09 2006
New Revision: 420722
URL: http://svn.apache.org/viewvc?rev=420722&view=rev
Log:
Fixed Network Connection failure recovery.
http://issues.apache.org/activemq/browse/AMQ-802
http://issues.apache.org/activemq/browse/AMQ-805
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Mon Jul 10 21:45:09 2006
@@ -53,9 +53,6 @@
ServiceSupport.dispose(this);
}
}
- if (!disposed){
- triggerLocalStartBridge();
- }
}
}
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Jul 10 21:45:09 2006
@@ -70,14 +70,14 @@
protected static final Log log = LogFactory.getLog(DemandForwardingBridge.class);
protected final Transport localBroker;
protected final Transport remoteBroker;
- protected IdGenerator idGenerator = new IdGenerator();
- protected LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+ protected final IdGenerator idGenerator = new IdGenerator();
+ protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
protected ConnectionInfo localConnectionInfo;
protected ConnectionInfo remoteConnectionInfo;
protected SessionInfo localSessionInfo;
protected ProducerInfo producerInfo;
- protected String localBrokerName;
- protected String remoteBrokerName;
+ protected String localBrokerName = "Unknown";
+ protected String remoteBrokerName = "Unknown";
protected String localClientId;
protected String userName;
protected String password;
@@ -87,22 +87,22 @@
protected String name = "bridge";
protected ConsumerInfo demandConsumerInfo;
protected int demandConsumerDispatched;
- protected AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
- protected AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
+ protected final AtomicBoolean localBridgeStarted = new AtomicBoolean(false);
+ protected final AtomicBoolean remoteBridgeStarted = new AtomicBoolean(false);
protected boolean disposed = false;
protected BrokerId localBrokerId;
protected ActiveMQDestination[] excludedDestinations;
protected ActiveMQDestination[] dynamicallyIncludedDestinations;
protected ActiveMQDestination[] staticallyIncludedDestinations;
protected ActiveMQDestination[] durableDestinations;
- protected ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
- protected ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
+ protected final ConcurrentHashMap subscriptionMapByLocalId = new ConcurrentHashMap();
+ protected final ConcurrentHashMap subscriptionMapByRemoteId = new ConcurrentHashMap();
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected boolean decreaseNetworkConsumerPriority;
- protected boolean shutDown;
protected int networkTTL = 1;
protected final AtomicBoolean remoteInterupted = new AtomicBoolean(false);
+ protected final AtomicBoolean lastConnectSucceeded = new AtomicBoolean(false);
public DemandForwardingBridgeSupport(final Transport localBroker, final Transport remoteBroker) {
@@ -111,7 +111,6 @@
}
public void start() throws Exception {
- log.info("Starting a network connection between "+localBroker+" and "+remoteBroker+" has been established.");
localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
serviceLocalCommand(command);
@@ -130,16 +129,23 @@
serviceRemoteException(error);
}
- public synchronized void transportInterupted(){
+ public void transportInterupted(){
//clear any subscriptions - to try and prevent the bridge from stalling the broker
if( remoteInterupted.compareAndSet(false, true) ) {
- log.warn("Outbound transport to " + remoteBrokerName + " interrupted ...");
- clearDownSubscriptions();
- try{
- localBroker.oneway(localConnectionInfo.createRemoveCommand());
- }catch(IOException e){
- log.warn("Caught exception from local start",e);
- }
+
+ log.debug("Outbound transport to " + remoteBrokerName + " interrupted.");
+
+ if( localBridgeStarted.get() ) {
+ clearDownSubscriptions();
+ synchronized( DemandForwardingBridgeSupport.this ) {
+ try{
+ localBroker.oneway(localConnectionInfo.createRemoveCommand());
+ }catch(IOException e){
+ log.warn("Caught exception from local start",e);
+ }
+ }
+ }
+
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
startedLatch = new CountDownLatch(2);
@@ -147,35 +153,33 @@
}
- public synchronized void transportResumed(){
-
+ public void transportResumed(){
if( remoteInterupted.compareAndSet(true, false) ) {
-
- //restart and static subscriptions - the consumer advisories will be replayed
- log.info("Outbound transport to " + remoteBrokerName + " resumed");
-
-// try{
-// triggerLocalStartBridge();
-// }catch(IOException e){
-// log.warn("Caught exception from local start",e);
-// }
-
- try{
- // clear out the previous connection as it may have missed some consumer advisories.
- remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
- triggerRemoteStartBridge();
- }catch(IOException e){
- log.warn("Caught exception from remote start",e);
- }
-
+
+ // We want to slow down false connects so that we don't get in a busy loop.
+ // False connects can occurr if you using SSH tunnels.
+ if( !lastConnectSucceeded.get() ) {
+ try {
+ log.debug("Previous connection was never fully established. Sleeping for second to avoid busy loop.");
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ lastConnectSucceeded.set(false);
+
+ log.debug("Outbound transport to " + remoteBrokerName + " resumed");
}
-
}
});
localBroker.start();
remoteBroker.start();
-// triggerLocalStartBridge();
- triggerRemoteStartBridge();
+
+ try{
+ triggerRemoteStartBridge();
+ }catch(IOException e){
+ log.warn("Caught exception from remote start",e);
+ }
}
protected void triggerLocalStartBridge() throws IOException {
@@ -184,7 +188,7 @@
try{
startLocalBridge();
}catch(Exception e){
- log.error("Failed to start network bridge: "+e,e);
+ serviceLocalException(e);
}
}
};
@@ -197,7 +201,7 @@
try{
startRemoteBridge();
}catch(Exception e){
- log.error("Failed to start network bridge: "+e,e);
+ serviceRemoteException(e);
}
}
};
@@ -206,121 +210,109 @@
protected void startLocalBridge() throws Exception {
if(localBridgeStarted.compareAndSet(false,true)){
-
- localConnectionInfo=new ConnectionInfo();
- localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- localClientId="NC_"+remoteBrokerName+"_inbound"+name;
- localConnectionInfo.setClientId(localClientId);
- localConnectionInfo.setUserName(userName);
- localConnectionInfo.setPassword(password);
- localBroker.oneway(localConnectionInfo);
-
- localSessionInfo=new SessionInfo(localConnectionInfo,1);
- localBroker.oneway(localSessionInfo);
-
- log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
- +") has been established.");
- startedLatch.countDown();
- setupStaticDestinations();
+ synchronized( this ) {
+ localConnectionInfo=new ConnectionInfo();
+ localConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+ localClientId="NC_"+remoteBrokerName+"_inbound"+name;
+ localConnectionInfo.setClientId(localClientId);
+ localConnectionInfo.setUserName(userName);
+ localConnectionInfo.setPassword(password);
+ localBroker.oneway(localConnectionInfo);
+
+ localSessionInfo=new SessionInfo(localConnectionInfo,1);
+ localBroker.oneway(localSessionInfo);
+
+ log.info("Network connection between "+localBroker+" and "+remoteBroker+"("+remoteBrokerName
+ +") has been established.");
+
+ startedLatch.countDown();
+ setupStaticDestinations();
+ }
}
}
protected void startRemoteBridge() throws Exception {
- if(remoteBridgeStarted.compareAndSet(false,true)){
-
- remoteConnectionInfo=new ConnectionInfo();
- remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
- remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
- remoteConnectionInfo.setUserName(userName);
- remoteConnectionInfo.setPassword(password);
- remoteBroker.oneway(remoteConnectionInfo);
-
- BrokerInfo brokerInfo=new BrokerInfo();
- brokerInfo.setBrokerName(localBrokerName);
- remoteBroker.oneway(brokerInfo);
-
- SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
- remoteBroker.oneway(remoteSessionInfo);
-
- producerInfo=new ProducerInfo(remoteSessionInfo,1);
- producerInfo.setResponseRequired(false);
- remoteBroker.oneway(producerInfo);
-
- // Listen to consumer advisory messages on the remote broker to determine demand.
- demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
- demandConsumerInfo.setDispatchAsync(dispatchAsync);
- demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
- +destinationFilter));
- demandConsumerInfo.setPrefetchSize(prefetchSize);
- remoteBroker.oneway(demandConsumerInfo);
- //we want information about Destinations as well
- ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
- destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
- destinationInfo.setPrefetchSize(prefetchSize);
- remoteBroker.oneway(destinationInfo);
- startedLatch.countDown();
+ if(remoteBridgeStarted.compareAndSet(false,true)) {
+
+ synchronized (this) {
+
+ if( remoteConnectionInfo!=null ) {
+ remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+ }
+
+ remoteConnectionInfo=new ConnectionInfo();
+ remoteConnectionInfo.setConnectionId(new ConnectionId(idGenerator.generateId()));
+ remoteConnectionInfo.setClientId("NC_"+localBrokerName+"_outbound"+name);
+ remoteConnectionInfo.setUserName(userName);
+ remoteConnectionInfo.setPassword(password);
+ remoteBroker.oneway(remoteConnectionInfo);
+
+ BrokerInfo brokerInfo=new BrokerInfo();
+ brokerInfo.setBrokerName(localBrokerName);
+ remoteBroker.oneway(brokerInfo);
+
+ SessionInfo remoteSessionInfo=new SessionInfo(remoteConnectionInfo,1);
+ remoteBroker.oneway(remoteSessionInfo);
+
+ producerInfo=new ProducerInfo(remoteSessionInfo,1);
+ producerInfo.setResponseRequired(false);
+ remoteBroker.oneway(producerInfo);
+
+ // Listen to consumer advisory messages on the remote broker to determine demand.
+ demandConsumerInfo=new ConsumerInfo(remoteSessionInfo,1);
+ demandConsumerInfo.setDispatchAsync(dispatchAsync);
+ demandConsumerInfo.setDestination(new ActiveMQTopic(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+ +destinationFilter));
+ demandConsumerInfo.setPrefetchSize(prefetchSize);
+ remoteBroker.oneway(demandConsumerInfo);
+ //we want information about Destinations as well
+ ConsumerInfo destinationInfo = new ConsumerInfo(remoteSessionInfo,2);
+ destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+ destinationInfo.setPrefetchSize(prefetchSize);
+ remoteBroker.oneway(destinationInfo);
+ startedLatch.countDown();
+
+ if (!disposed){
+ triggerLocalStartBridge();
+ }
+
+ }
}
}
public void stop() throws Exception {
- shutDown = true;
- doStop();
- }
-
- /**
- * stop the bridge
- * @throws Exception
- */
- protected void doStop() throws Exception {
log.debug(" stopping "+localBrokerName+ " bridge to " + remoteBrokerName + " is disposed already ? "+disposed);
- if(!disposed){
- try{
- disposed=true;
-
- remoteBridgeStarted.set(false);
- if(!shutDown){
- remoteBroker.oneway(new ShutdownInfo());
- if(localConnectionInfo!=null){
- localBroker.oneway(localConnectionInfo.createRemoveCommand());
- remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
- }
- localBroker.oneway(new ShutdownInfo());
- }
- localBroker.setTransportListener(null);
- remoteBroker.setTransportListener(null);
- }catch(IOException e){
- log.debug("Caught exception stopping",e);
- }finally{
- ServiceStopper ss=new ServiceStopper();
- ss.stop(localBroker);
- ss.stop(remoteBroker);
- ss.throwFirstException();
- }
- }
+ if (!disposed) {
+ try {
+ disposed = true;
+
+ remoteBridgeStarted.set(false);
+
+ localBroker.oneway(new ShutdownInfo());
+ remoteBroker.oneway(new ShutdownInfo());
+
+ } catch (IOException e) {
+ log.debug("Caught exception stopping", e);
+ } finally {
+ ServiceStopper ss = new ServiceStopper();
+ ss.stop(localBroker);
+ ss.stop(remoteBroker);
+ ss.throwFirstException();
+ }
+ }
log.debug(localBrokerName+ " bridge to " + remoteBrokerName + " stopped");
}
- protected void doStopLocal(){
- try{
- if(!shutDown){
- if(localConnectionInfo!=null){
- localBroker.oneway(localConnectionInfo.createRemoveCommand());
- }
- localBroker.oneway(new ShutdownInfo());
- }
- localBroker.setTransportListener(null);
- }catch(IOException e){
- log.debug("Caught exception stopping",e);
- }finally{
- ServiceStopper ss=new ServiceStopper();
- ss.stop(localBroker);
- localBridgeStarted.set(false);
- }
- }
-
protected void serviceRemoteException(Throwable error) {
- log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
- ServiceSupport.dispose(this);
+ if( !disposed ) {
+ log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a remote error: "+error);
+ log.debug("The remote Exception was: "+error, error);
+ new Thread() {
+ public void run() {
+ ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+ }
+ }.start();
+ }
}
protected void serviceRemoteCommand(Command command) {
@@ -336,7 +328,10 @@
demandConsumerDispatched=0;
}
}else if(command.isBrokerInfo()){
- serviceRemoteBrokerInfo(command);
+
+ lastConnectSucceeded.set(true);
+ serviceRemoteBrokerInfo(command);
+
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());
@@ -344,6 +339,7 @@
switch(command.getDataStructureType()){
case KeepAliveInfo.DATA_STRUCTURE_TYPE:
case WireFormatInfo.DATA_STRUCTURE_TYPE:
+ case ShutdownInfo.DATA_STRUCTURE_TYPE:
break;
default:
log.warn("Unexpected remote command: "+command);
@@ -413,7 +409,10 @@
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
}
+
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+
+ log.debug("Replying destination control command: "+destInfo);
localBroker.oneway(destInfo);
}
@@ -424,8 +423,15 @@
}
protected void serviceLocalException(Throwable error) {
- log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown: "+error.getMessage(),error);
- ServiceSupport.dispose(this);
+ if( !disposed ) {
+ log.info("Network connection between "+localBroker+" and "+remoteBroker+" shutdown due to a local error: "+error);
+ log.debug("The local Exception was:"+error,error);
+ new Thread() {
+ public void run() {
+ ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
+ }
+ }.start();
+ }
}
protected void addSubscription(DemandSubscription sub) throws IOException {
@@ -502,16 +508,6 @@
remoteBroker.asyncRequest(message, callback);
}
- // Ack on every message since we don't know if the broker is blocked due to memory
- // usage and is waiting for an Ack to un-block him.
-
- // Acking a range is more efficient, but also more prone to locking up a server
- // Perhaps doing something like the following should be policy based.
-// int dispatched = sub.incrementDispatched();
-// if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
-// localBroker.oneway(new MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
-// sub.setDispatched(0);
-// }
}
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
@@ -521,8 +517,7 @@
// the local transport is just shutting down temporarily until the remote side
// is restored.
if( !remoteInterupted.get() ) {
- shutDown = true;
- doStop();
+ stop();
}
}else if(command.getClass() == ConnectionError.class ) {
ConnectionError ce = (ConnectionError) command;
@@ -695,21 +690,7 @@
public void setNetworkTTL(int networkTTL) {
this.networkTTL=networkTTL;
}
-
- /**
- * @return Returns the shutDown.
- */
- public boolean isShutDown() {
- return shutDown;
- }
-
- /**
- * @param shutDown The shutDown to set.
- */
- public void setShutDown(boolean shutDown) {
- this.shutDown=shutDown;
- }
-
+
public static boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if(brokerPath!=null){
for(int i=0;i<brokerPath.length;i++){
@@ -843,7 +824,8 @@
}
protected void clearDownSubscriptions() {
-
+ subscriptionMapByLocalId.clear();
+ subscriptionMapByRemoteId.clear();
}
protected abstract NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException;
Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java?rev=420722&r1=420721&r2=420722&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java Mon Jul 10 21:45:09 2006
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.network;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@@ -28,10 +30,7 @@
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Iterator;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* A network connector which uses a discovery agent to detect the remote brokers
@@ -58,6 +57,11 @@
}
public void onServiceAdd(DiscoveryEvent event) {
+
+ // Ignore events once we start stopping.
+ if( isStopped() || isStopping() )
+ return;
+
String url = event.getServiceName();
if (url != null) {
@@ -80,7 +84,7 @@
URI connectUri = uri;
if (failover) {
try {
- connectUri = new URI("failover:" + connectUri);
+ connectUri = new URI("failover:(" + connectUri+")?maxReconnectDelay=1000");
}
catch (URISyntaxException e) {
log.warn("Could not create failover URI: " + connectUri);
@@ -90,22 +94,24 @@
log.info("Establishing network connection between from " + localURI + " to " + connectUri);
- Transport localTransport;
+ Transport remoteTransport;
try {
- localTransport = createLocalTransport();
+ remoteTransport = TransportFactory.connect(connectUri);
}
catch (Exception e) {
- log.warn("Could not connect to local URI: " + localURI + ": " + e, e);
+ log.warn("Could not connect to remote URI: " + localURI + ": " + e.getMessage());
+ log.debug("Connection failure exception: "+ e, e);
return;
}
- Transport remoteTransport;
+ Transport localTransport;
try {
- remoteTransport = TransportFactory.connect(connectUri);
+ localTransport = createLocalTransport();
}
catch (Exception e) {
- ServiceSupport.dispose(localTransport);
- log.warn("Could not connect to remote URI: " + connectUri + ": " + e, e);
+ ServiceSupport.dispose(remoteTransport);
+ log.warn("Could not connect to local URI: " + localURI + ": " + e.getMessage());
+ log.debug("Connection failure exception: "+ e, e);
return;
}
@@ -117,7 +123,13 @@
catch (Exception e) {
ServiceSupport.dispose(localTransport);
ServiceSupport.dispose(remoteTransport);
- log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e, e);
+ log.warn("Could not start network bridge between: " + localURI + " and: " + uri + " due to: " + e);
+ log.debug("Start failure exception: "+ e, e);
+
+ try {
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e1) {
+ }
return;
}
}
@@ -196,45 +208,81 @@
if (conduitSubscriptions) {
if (dynamicOnly) {
result = new ConduitBridge(localTransport, remoteTransport) {
- protected void serviceRemoteException(Exception error) {
- super.serviceRemoteException(error);
- try {
- // Notify the discovery agent that the remote broker
- // failed.
- discoveryAgent.serviceFailed(event);
- }
- catch (IOException e) {
- }
- }
+ protected void serviceLocalException(Throwable error) {
+ try {
+ super.serviceLocalException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ protected void serviceRemoteException(Throwable error) {
+ try {
+ super.serviceRemoteException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ public void fireServiceFailed() {
+ if( !isStopped() ) {
+ try {
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ }
};
}
else {
result = new DurableConduitBridge(localTransport, remoteTransport) {
- protected void serviceRemoteException(Exception error) {
- super.serviceRemoteException(error);
- try {
- // Notify the discovery agent that the remote broker
- // failed.
- discoveryAgent.serviceFailed(event);
- }
- catch (IOException e) {
- }
- }
+ protected void serviceLocalException(Throwable error) {
+ try {
+ super.serviceLocalException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ protected void serviceRemoteException(Throwable error) {
+ try {
+ super.serviceRemoteException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ public void fireServiceFailed() {
+ if( !isStopped() ) {
+ try {
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ }
};
}
}
else {
- result = new DemandForwardingBridge(localTransport, remoteTransport) {
- protected void serviceRemoteException(Exception error) {
- super.serviceRemoteException(error);
- try {
- // Notify the discovery agent that the remote broker
- // failed.
- discoveryAgent.serviceFailed(event);
- }
- catch (IOException e) {
- }
- }
+ result = new DemandForwardingBridge(localTransport, remoteTransport) {
+ protected void serviceLocalException(Throwable error) {
+ try {
+ super.serviceLocalException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ protected void serviceRemoteException(Throwable error) {
+ try {
+ super.serviceRemoteException(error);
+ } finally {
+ fireServiceFailed();
+ }
+ }
+ public void fireServiceFailed() {
+ if( !isStopped() ) {
+ try {
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ }
};
}
return configureBridge(result);