You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/24 19:43:27 UTC
svn commit: r559132 -
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Author: chirino
Date: Tue Jul 24 10:43:26 2007
New Revision: 559132
URL: http://svn.apache.org/viewvc?view=rev&rev=559132
Log:
Fix for AMQ-1342 - Added backoff delay in generating discovery events when broker failures are reported
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?view=diff&rev=559132&r1=559131&r2=559132
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Tue Jul 24 10:43:26 2007
@@ -57,11 +57,102 @@
private static final String DELIMITER = "%";
private static final int BUFF_SIZE=8192;
private static final int DEFAULT_IDLE_TIME=500;
- private static final int HEARTBEAT_MISS_BEFORE_DEATH=4;
+ private static final int HEARTBEAT_MISS_BEFORE_DEATH=10;
+
+ private long initialReconnectDelay = 1000*5;
+ private long maxReconnectDelay = 1000 * 30;
+ private long backOffMultiplier = 2;
+ private boolean useExponentialBackOff = false;
+ private int maxReconnectAttempts;
+
+
+ class RemoteBrokerData {
+ final String brokerName;
+ final String service;
+ long lastHeartBeat;
+ long recoveryTime;
+ int failureCount;
+ boolean failed;
+
+ public RemoteBrokerData(String brokerName, String service) {
+ this.brokerName=brokerName;
+ this.service=service;
+ this.lastHeartBeat=System.currentTimeMillis();
+ }
+
+ synchronized public void updateHeartBeat() {
+ lastHeartBeat= System.currentTimeMillis();
+
+ // Consider that the broker recovery has succeeded if it has not failed in 60 seconds.
+ if( !failed && failureCount>0 && (lastHeartBeat-recoveryTime) > 1000*60 ) {
+ if(log.isDebugEnabled())
+ log.debug("I now think that the "+service+" service has recovered.");
+ failureCount=0;
+ recoveryTime=0;
+ }
+ }
+
+ synchronized public long getLastHeartBeat() {
+ return lastHeartBeat;
+ }
+
+ synchronized public boolean markFailed() {
+ if ( !failed ) {
+ failed=true;
+ failureCount++;
+
+ long reconnectDelay;
+ if (!useExponentialBackOff) {
+ reconnectDelay = initialReconnectDelay;
+ } else {
+ reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
+ if(reconnectDelay>maxReconnectDelay)
+ reconnectDelay=maxReconnectDelay;
+ }
+
+ if(log.isDebugEnabled())
+ log.debug("Remote failure of "+service+" while still receiving multicast advertisements. Advertising events will be suppressed for "+reconnectDelay+" ms, the current failure count is: "+failureCount);
+
+ recoveryTime = System.currentTimeMillis()+reconnectDelay;
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * @return true if this broker is marked failed and it is now the right time to start recovery.
+ */
+ synchronized public boolean doRecovery() {
+ if( !failed )
+ return false;
+
+ // Are we done trying to recover this guy?
+ if( maxReconnectAttempts>0 && failureCount > maxReconnectAttempts ) {
+ if(log.isDebugEnabled())
+ log.debug("Max reconnect attempts of the "+service+" service has been reached.");
+ return false;
+ }
+
+ // Is it not yet time?
+ if( System.currentTimeMillis() < recoveryTime )
+ return false;
+
+ if(log.isDebugEnabled())
+ log.debug("Resuming event advertisement of the "+service+" service.");
+
+
+ failed=false;
+ return true;
+ }
+
+ public boolean isFailed() {
+ return failed;
+ }
+ }
+
private int timeToLive=1;
private boolean loopBackMode=false;
- private Map services=new ConcurrentHashMap();
- private Map brokers = new ConcurrentHashMap();
+ private Map brokersByService=new ConcurrentHashMap();
private String group="default";
private String brokerName;
private URI discoveryURI;
@@ -325,66 +416,37 @@
private void processAlive(String brokerName,String service){
if(selfService == null || !service.equals(selfService)){
- AtomicLong lastKeepAlive=(AtomicLong) services.get(service);
- if(lastKeepAlive==null){
- brokers.put(service, brokerName);
- if(discoveryListener!=null){
- final DiscoveryEvent event=new DiscoveryEvent(service);
- event.setBrokerName(brokerName);
-
- // Have the listener process the event async so that
- // he does not block this thread since we are doing time sensitive
- // processing of events.
- executor.execute(new Runnable() {
- public void run() {
- DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
- if(discoveryListener!=null){
- discoveryListener.onServiceAdd(event);
- }
- }
- });
- }
- lastKeepAlive=new AtomicLong(System.currentTimeMillis());
- services.put(service,lastKeepAlive);
+ RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(service);
+ if(data==null){
+ data = new RemoteBrokerData(brokerName, service);
+ brokersByService.put(service,data);;
+ fireServiceAddEvent(data);
doAdvertizeSelf();
+ } else {
+ data.updateHeartBeat();
+ if( data.doRecovery() ) {
+ fireServiceAddEvent(data);
+ }
}
- lastKeepAlive.set(System.currentTimeMillis());
}
}
private void processDead(String brokerName,String service){
if(!service.equals(selfService)){
- if(services.remove(service)!=null){
- brokers.remove(service);
- if(discoveryListener!=null){
- final DiscoveryEvent event=new DiscoveryEvent(service);
- event.setBrokerName(brokerName);
-
- // Have the listener process the event async so that
- // he does not block this thread since we are doing time sensitive
- // processing of events.
- executor.execute(new Runnable() {
- public void run() {
- DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
- if(discoveryListener!=null){
- discoveryListener.onServiceRemove(event);
- }
- }
- });
- }
+ RemoteBrokerData data = (RemoteBrokerData) brokersByService.remove(service);
+ if(data!=null && !data.isFailed() ){
+ fireServiceRemovedEvent(data);
}
}
}
private void doExpireOldServices(){
long expireTime=System.currentTimeMillis()-(keepAliveInterval*HEARTBEAT_MISS_BEFORE_DEATH);
- for(Iterator i=services.entrySet().iterator();i.hasNext();){
- Map.Entry entry=(Map.Entry) i.next();
- AtomicLong lastHeartBeat=(AtomicLong) entry.getValue();
- if(lastHeartBeat.get()<expireTime){
- String brokerName = (String)brokers.get(entry.getKey());
- processDead(brokerName,entry.getKey().toString());
+ for(Iterator i=brokersByService.values().iterator();i.hasNext();){
+ RemoteBrokerData data=(RemoteBrokerData)i.next();
+ if( data.getLastHeartBeat() < expireTime){
+ processDead(brokerName, data.service);
}
}
}
@@ -400,6 +462,86 @@
}
public void serviceFailed(DiscoveryEvent event) throws IOException {
- processDead(event.getBrokerName(), event.getServiceName());
- }
+ RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(event.getServiceName());
+ if(data!=null && data.markFailed() ) {
+ fireServiceRemovedEvent(data);
+ }
+ }
+
+ private void fireServiceRemovedEvent(RemoteBrokerData data) {
+ if( discoveryListener!=null){
+ final DiscoveryEvent event=new DiscoveryEvent(data.service);
+ event.setBrokerName(data.brokerName);
+
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.execute(new Runnable() {
+ public void run() {
+ DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+ if(discoveryListener!=null){
+ discoveryListener.onServiceRemove(event);
+ }
+ }
+ });
+ }
+ }
+ private void fireServiceAddEvent(RemoteBrokerData data) {
+ if( discoveryListener!=null){
+ final DiscoveryEvent event=new DiscoveryEvent(data.service);
+ event.setBrokerName(data.brokerName);
+
+ // Have the listener process the event async so that
+ // he does not block this thread since we are doing time sensitive
+ // processing of events.
+ executor.execute(new Runnable() {
+ public void run() {
+ DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+ if(discoveryListener!=null){
+ discoveryListener.onServiceAdd(event);
+ }
+ }
+ });
+ }
+ }
+
+ public long getBackOffMultiplier() {
+ return backOffMultiplier;
+ }
+
+ public void setBackOffMultiplier(long backOffMultiplier) {
+ this.backOffMultiplier = backOffMultiplier;
+ }
+
+ public long getInitialReconnectDelay() {
+ return initialReconnectDelay;
+ }
+
+ public void setInitialReconnectDelay(long initialReconnectDelay) {
+ this.initialReconnectDelay = initialReconnectDelay;
+ }
+
+ public int getMaxReconnectAttempts() {
+ return maxReconnectAttempts;
+ }
+
+ public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+ this.maxReconnectAttempts = maxReconnectAttempts;
+ }
+
+ public long getMaxReconnectDelay() {
+ return maxReconnectDelay;
+ }
+
+ public void setMaxReconnectDelay(long maxReconnectDelay) {
+ this.maxReconnectDelay = maxReconnectDelay;
+ }
+
+ public boolean isUseExponentialBackOff() {
+ return useExponentialBackOff;
+ }
+
+ public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+ this.useExponentialBackOff = useExponentialBackOff;
+ }
}