You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2007/07/24 19:43:27 UTC

svn commit: r559132 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java

Author: chirino
Date: Tue Jul 24 10:43:26 2007
New Revision: 559132

URL: http://svn.apache.org/viewvc?view=rev&rev=559132
Log:
Fix for AMQ-1342 - Added backoff delay in generating discovery events when broker failures are reported

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java?view=diff&rev=559132&r1=559131&r2=559132
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java Tue Jul 24 10:43:26 2007
@@ -57,11 +57,102 @@
     private static final String DELIMITER = "%";
     private static final int BUFF_SIZE=8192;
     private static final int DEFAULT_IDLE_TIME=500;
-    private static final int HEARTBEAT_MISS_BEFORE_DEATH=4;
+    private static final int HEARTBEAT_MISS_BEFORE_DEATH=10;
+    
+    private long initialReconnectDelay = 1000*5;
+    private long maxReconnectDelay = 1000 * 30;
+    private long backOffMultiplier = 2;
+    private boolean useExponentialBackOff = false;
+    private int maxReconnectAttempts;
+    
+    
+    class RemoteBrokerData {
+		final String brokerName;
+		final String service;
+    	long lastHeartBeat;
+    	long recoveryTime;
+    	int failureCount;
+    	boolean failed;
+    	
+    	public RemoteBrokerData(String brokerName, String service) {
+			this.brokerName=brokerName;
+			this.service=service;
+			this.lastHeartBeat=System.currentTimeMillis();
+		}
+
+		synchronized public void updateHeartBeat() {
+            lastHeartBeat= System.currentTimeMillis();
+            
+            // Consider that the broker recovery has succeeded if it has not failed in 60 seconds. 
+            if( !failed && failureCount>0 && (lastHeartBeat-recoveryTime) > 1000*60 ) {
+	            if(log.isDebugEnabled())
+	            	log.debug("I now think that the "+service+" service has recovered.");
+            	failureCount=0;
+            	recoveryTime=0;
+            }
+		}
+
+		synchronized public long getLastHeartBeat() {
+			return lastHeartBeat;
+		}
+
+		synchronized public boolean markFailed() {
+			if ( !failed ) {
+				failed=true;
+				failureCount++;
+				
+				long reconnectDelay;
+	            if (!useExponentialBackOff) {
+	            	reconnectDelay = initialReconnectDelay;
+	            } else {
+	            	reconnectDelay = (long)Math.pow(backOffMultiplier, failureCount);
+	                if(reconnectDelay>maxReconnectDelay)
+	                	reconnectDelay=maxReconnectDelay;
+	            }
+	            
+	            if(log.isDebugEnabled())
+	            	log.debug("Remote failure of "+service+" while still receiving multicast advertisements.  Advertising events will be suppressed for "+reconnectDelay+" ms, the current failure count is: "+failureCount);
+
+	            recoveryTime = System.currentTimeMillis()+reconnectDelay;
+	            return true;
+			}
+			return false;
+		}
+
+		/**
+		 * @return true if this broker is marked failed and it is now the right time to start recovery.
+		 */
+		synchronized public boolean doRecovery() {
+			if( !failed )
+				return false;
+			
+			// Are we done trying to recover this guy?
+			if( maxReconnectAttempts>0 && failureCount > maxReconnectAttempts ) { 
+	            if(log.isDebugEnabled())
+	            	log.debug("Max reconnect attempts of the "+service+" service has been reached.");
+				return false;
+			}
+			
+			// Is it not yet time?
+			if( System.currentTimeMillis() < recoveryTime )
+				return false;
+
+            if(log.isDebugEnabled())
+            	log.debug("Resuming event advertisement of the "+service+" service.");
+			
+			
+			failed=false;
+			return true;
+		}
+
+		public boolean isFailed() {
+			return failed;
+		}
+    }
+    
     private int timeToLive=1;
     private boolean loopBackMode=false;
-    private Map services=new ConcurrentHashMap();
-    private Map brokers = new ConcurrentHashMap();
+    private Map brokersByService=new ConcurrentHashMap();
     private String group="default";
     private String brokerName;
     private URI discoveryURI;
@@ -325,66 +416,37 @@
 
     private void processAlive(String brokerName,String service){
         if(selfService == null || !service.equals(selfService)){
-            AtomicLong lastKeepAlive=(AtomicLong) services.get(service);
-            if(lastKeepAlive==null){
-                brokers.put(service, brokerName);
-                if(discoveryListener!=null){
-                    final DiscoveryEvent event=new DiscoveryEvent(service);
-                    event.setBrokerName(brokerName);
-                    
-                    // Have the listener process the event async so that 
-                    // he does not block this thread since we are doing time sensitive
-                    // processing of events.
-                    executor.execute(new Runnable() {
-                        public void run() {
-                            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
-                            if(discoveryListener!=null){
-                                discoveryListener.onServiceAdd(event);
-                            }
-                        }
-                    });
-                }
-                lastKeepAlive=new AtomicLong(System.currentTimeMillis());
-                services.put(service,lastKeepAlive);
+        	RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(service);
+            if(data==null){
+            	data = new RemoteBrokerData(brokerName, service);
+                brokersByService.put(service,data);;
+        		fireServiceAddEvent(data);
                 doAdvertizeSelf();
                 
+            } else {
+            	data.updateHeartBeat();
+            	if( data.doRecovery() ) {
+            		fireServiceAddEvent(data);
+            	}
             }
-            lastKeepAlive.set(System.currentTimeMillis());
         }
     }
 
     private void processDead(String brokerName,String service){
         if(!service.equals(selfService)){
-            if(services.remove(service)!=null){
-                brokers.remove(service);
-                if(discoveryListener!=null){
-                    final DiscoveryEvent event=new DiscoveryEvent(service);
-                    event.setBrokerName(brokerName);
-                    
-                    // Have the listener process the event async so that 
-                    // he does not block this thread since we are doing time sensitive
-                    // processing of events.
-                    executor.execute(new Runnable() {
-                        public void run() {
-                            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
-                            if(discoveryListener!=null){
-                                discoveryListener.onServiceRemove(event);
-                            }
-                        }
-                    });
-                }
+        	RemoteBrokerData data = (RemoteBrokerData) brokersByService.remove(service);
+            if(data!=null && !data.isFailed() ){
+                fireServiceRemovedEvent(data);
             }
         }
     }
 
     private void doExpireOldServices(){
         long expireTime=System.currentTimeMillis()-(keepAliveInterval*HEARTBEAT_MISS_BEFORE_DEATH);
-        for(Iterator i=services.entrySet().iterator();i.hasNext();){
-            Map.Entry entry=(Map.Entry) i.next();
-            AtomicLong lastHeartBeat=(AtomicLong) entry.getValue();
-            if(lastHeartBeat.get()<expireTime){
-                String brokerName = (String)brokers.get(entry.getKey());
-                processDead(brokerName,entry.getKey().toString());
+        for(Iterator i=brokersByService.values().iterator();i.hasNext();){
+        	RemoteBrokerData data=(RemoteBrokerData)i.next();
+            if( data.getLastHeartBeat() < expireTime){
+                processDead(brokerName, data.service);
             }
         }
     }
@@ -400,6 +462,86 @@
     }
 
     public void serviceFailed(DiscoveryEvent event) throws IOException {
-        processDead(event.getBrokerName(), event.getServiceName());
-    }
+    	RemoteBrokerData data = (RemoteBrokerData)brokersByService.get(event.getServiceName());
+        if(data!=null && data.markFailed() ) {
+            fireServiceRemovedEvent(data);
+        }
+    }
+
+	private void fireServiceRemovedEvent(RemoteBrokerData data) {
+		if( discoveryListener!=null){
+		    final DiscoveryEvent event=new DiscoveryEvent(data.service);
+		    event.setBrokerName(data.brokerName);
+		    
+		    // Have the listener process the event async so that 
+		    // he does not block this thread since we are doing time sensitive
+		    // processing of events.
+		    executor.execute(new Runnable() {
+		        public void run() {
+		            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+		            if(discoveryListener!=null){
+		                discoveryListener.onServiceRemove(event);
+		            }
+		        }
+		    });
+		}
+	}
+	private void fireServiceAddEvent(RemoteBrokerData data) {
+		if( discoveryListener!=null){
+		    final DiscoveryEvent event=new DiscoveryEvent(data.service);
+		    event.setBrokerName(data.brokerName);
+		    
+		    // Have the listener process the event async so that 
+		    // he does not block this thread since we are doing time sensitive
+		    // processing of events.
+		    executor.execute(new Runnable() {
+		        public void run() {
+		            DiscoveryListener discoveryListener = MulticastDiscoveryAgent.this.discoveryListener;
+		            if(discoveryListener!=null){
+		                discoveryListener.onServiceAdd(event);
+		            }
+		        }
+		    });
+		}
+	}
+
+	public long getBackOffMultiplier() {
+		return backOffMultiplier;
+	}
+
+	public void setBackOffMultiplier(long backOffMultiplier) {
+		this.backOffMultiplier = backOffMultiplier;
+	}
+
+	public long getInitialReconnectDelay() {
+		return initialReconnectDelay;
+	}
+
+	public void setInitialReconnectDelay(long initialReconnectDelay) {
+		this.initialReconnectDelay = initialReconnectDelay;
+	}
+
+	public int getMaxReconnectAttempts() {
+		return maxReconnectAttempts;
+	}
+
+	public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+		this.maxReconnectAttempts = maxReconnectAttempts;
+	}
+
+	public long getMaxReconnectDelay() {
+		return maxReconnectDelay;
+	}
+
+	public void setMaxReconnectDelay(long maxReconnectDelay) {
+		this.maxReconnectDelay = maxReconnectDelay;
+	}
+
+	public boolean isUseExponentialBackOff() {
+		return useExponentialBackOff;
+	}
+
+	public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+		this.useExponentialBackOff = useExponentialBackOff;
+	}
 }