You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:37:02 UTC

svn commit: r420717 - /incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java

Author: chirino
Date: Mon Jul 10 21:37:01 2006
New Revision: 420717

URL: http://svn.apache.org/viewvc?rev=420717&view=rev
Log:
Added reconnect logic.
http://issues.apache.org/activemq/browse/AMQ-803

Modified:
    incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java

Modified: incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=420717&r1=420716&r2=420717&view=diff
==============================================================================
--- incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Mon Jul 10 21:37:01 2006
@@ -23,6 +23,8 @@
 import org.apache.activemq.transport.discovery.DiscoveryAgent;
 import org.apache.activemq.transport.discovery.DiscoveryListener;
 
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * A simple DiscoveryAgent that allows static configuration of the discovered services.
  * 
@@ -30,9 +32,31 @@
  */
 public class SimpleDiscoveryAgent implements DiscoveryAgent {
     
+    private long initialReconnectDelay = 1000*5;
+    private long maxReconnectDelay = 1000 * 30;
+    private long backOffMultiplier = 2;
+    private boolean useExponentialBackOff = false;
+    private int maxReconnectAttempts;
+    private final Object sleepMutex = new Object();
+    private long minConnectTime = 500;
+
     private DiscoveryListener listener;
     String services[] = new String[] {};
     String group = "DEFAULT";
+    private final AtomicBoolean running = new AtomicBoolean(false);
+    
+    class SimpleDiscoveryEvent extends DiscoveryEvent {
+		
+    	private int connectFailures;
+        private long reconnectDelay = initialReconnectDelay;
+        private long connectTime = System.currentTimeMillis();
+        private AtomicBoolean failed = new AtomicBoolean(false);
+
+        public SimpleDiscoveryEvent(String service) {
+			super(service);
+		}
+        
+    }
     
     public void setDiscoveryListener(DiscoveryListener listener) {
         this.listener = listener;
@@ -42,12 +66,17 @@
     }
     
     public void start() throws Exception {
+    	running.set(true);
         for (int i = 0; i < services.length; i++) {
-            listener.onServiceAdd(new DiscoveryEvent(services[i]));
+            listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
         }
     }
     
     public void stop() throws Exception {
+    	running.set(false);
+    	synchronized(sleepMutex) {
+    		sleepMutex.notifyAll();
+    	}
     }
   
     public String[] getServices() {
@@ -80,7 +109,112 @@
     public void setBrokerName(String brokerName) {
     }
 
-    public void serviceFailed(DiscoveryEvent event) throws IOException {
+    public void serviceFailed(DiscoveryEvent devent) throws IOException {
+    	
+        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
+        if( event.failed.compareAndSet(false, true) ) {
+        	
+			listener.onServiceRemove(event);
+	    	Thread thread = new Thread() {
+	    		public void run() {
+	
+	
+	    			// We detect a failed connection attempt because the service fails right
+	    			// away.
+	    			if( event.connectTime + minConnectTime > System.currentTimeMillis()  ) {
+	    				
+	    				event.connectFailures++;
+	    				
+	    				if( maxReconnectAttempts>0 &&  event.connectFailures >= maxReconnectAttempts ) {
+	    					// Don' try to re-connect
+	    					return;
+	    				}
+	    				
+		                synchronized(sleepMutex){
+		                    try{
+		                    	if( !running.get() )
+		                    		return;
+		                    	
+		                        sleepMutex.wait(event.reconnectDelay);
+		                    }catch(InterruptedException ie){
+		                       return;
+		                    }
+		                }
+	
+		                if (!useExponentialBackOff) {
+		                    event.reconnectDelay = initialReconnectDelay;
+		                } else {
+		                    // Exponential increment of reconnect delay.
+		                    event.reconnectDelay*=backOffMultiplier;
+		                    if(event.reconnectDelay>maxReconnectDelay)
+		                        event.reconnectDelay=maxReconnectDelay;
+		                }
+		                
+	    			} else {
+	    				event.connectFailures = 0;
+	                    event.reconnectDelay = initialReconnectDelay;
+	    			}
+	    			                    			
+	            	if( !running.get() )
+	            		return;
+	            	
+	    			event.connectTime = System.currentTimeMillis();
+	    			event.failed.set(false);
+	    			
+	    			listener.onServiceAdd(event);
+	    		}
+	    	};
+	    	thread.setDaemon(true);
+	    	thread.start();
+        }
     }
+
+	public long getBackOffMultiplier() {
+		return backOffMultiplier;
+	}
+
+	public void setBackOffMultiplier(long backOffMultiplier) {
+		this.backOffMultiplier = backOffMultiplier;
+	}
+
+	public long getInitialReconnectDelay() {
+		return initialReconnectDelay;
+	}
+
+	public void setInitialReconnectDelay(long initialReconnectDelay) {
+		this.initialReconnectDelay = initialReconnectDelay;
+	}
+
+	public int getMaxReconnectAttempts() {
+		return maxReconnectAttempts;
+	}
+
+	public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+		this.maxReconnectAttempts = maxReconnectAttempts;
+	}
+
+	public long getMaxReconnectDelay() {
+		return maxReconnectDelay;
+	}
+
+	public void setMaxReconnectDelay(long maxReconnectDelay) {
+		this.maxReconnectDelay = maxReconnectDelay;
+	}
+
+	public long getMinConnectTime() {
+		return minConnectTime;
+	}
+
+	public void setMinConnectTime(long minConnectTime) {
+		this.minConnectTime = minConnectTime;
+	}
+
+	public boolean isUseExponentialBackOff() {
+		return useExponentialBackOff;
+	}
+
+	public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+		this.useExponentialBackOff = useExponentialBackOff;
+	}
     
 }