You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/07/11 06:38:19 UTC
svn commit: r420718 -
/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Author: chirino
Date: Mon Jul 10 21:38:18 2006
New Revision: 420718
URL: http://svn.apache.org/viewvc?rev=420718&view=rev
Log:
Added reconnect logic.
http://issues.apache.org/activemq/browse/AMQ-803
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java?rev=420718&r1=420717&r2=420718&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/simple/SimpleDiscoveryAgent.java Mon Jul 10 21:38:18 2006
@@ -23,6 +23,8 @@
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryListener;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
/**
* A simple DiscoveryAgent that allows static configuration of the discovered services.
*
@@ -30,9 +32,31 @@
*/
public class SimpleDiscoveryAgent implements DiscoveryAgent {
+ private long initialReconnectDelay = 1000*5;
+ private long maxReconnectDelay = 1000 * 30;
+ private long backOffMultiplier = 2;
+ private boolean useExponentialBackOff = false;
+ private int maxReconnectAttempts;
+ private final Object sleepMutex = new Object();
+ private long minConnectTime = 500;
+
private DiscoveryListener listener;
String services[] = new String[] {};
String group = "DEFAULT";
+ private final AtomicBoolean running = new AtomicBoolean(false);
+
+ class SimpleDiscoveryEvent extends DiscoveryEvent {
+
+ private int connectFailures;
+ private long reconnectDelay = initialReconnectDelay;
+ private long connectTime = System.currentTimeMillis();
+ private AtomicBoolean failed = new AtomicBoolean(false);
+
+ public SimpleDiscoveryEvent(String service) {
+ super(service);
+ }
+
+ }
public void setDiscoveryListener(DiscoveryListener listener) {
this.listener = listener;
@@ -42,12 +66,17 @@
}
public void start() throws Exception {
+ running.set(true);
for (int i = 0; i < services.length; i++) {
- listener.onServiceAdd(new DiscoveryEvent(services[i]));
+ listener.onServiceAdd(new SimpleDiscoveryEvent(services[i]));
}
}
public void stop() throws Exception {
+ running.set(false);
+ synchronized(sleepMutex) {
+ sleepMutex.notifyAll();
+ }
}
public String[] getServices() {
@@ -80,7 +109,112 @@
public void setBrokerName(String brokerName) {
}
- public void serviceFailed(DiscoveryEvent event) throws IOException {
+ public void serviceFailed(DiscoveryEvent devent) throws IOException {
+
+ final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
+ if( event.failed.compareAndSet(false, true) ) {
+
+ listener.onServiceRemove(event);
+ Thread thread = new Thread() {
+ public void run() {
+
+
+ // We detect a failed connection attempt because the service fails right
+ // away.
+ if( event.connectTime + minConnectTime > System.currentTimeMillis() ) {
+
+ event.connectFailures++;
+
+ if( maxReconnectAttempts>0 && event.connectFailures >= maxReconnectAttempts ) {
+ // Don' try to re-connect
+ return;
+ }
+
+ synchronized(sleepMutex){
+ try{
+ if( !running.get() )
+ return;
+
+ sleepMutex.wait(event.reconnectDelay);
+ }catch(InterruptedException ie){
+ return;
+ }
+ }
+
+ if (!useExponentialBackOff) {
+ event.reconnectDelay = initialReconnectDelay;
+ } else {
+ // Exponential increment of reconnect delay.
+ event.reconnectDelay*=backOffMultiplier;
+ if(event.reconnectDelay>maxReconnectDelay)
+ event.reconnectDelay=maxReconnectDelay;
+ }
+
+ } else {
+ event.connectFailures = 0;
+ event.reconnectDelay = initialReconnectDelay;
+ }
+
+ if( !running.get() )
+ return;
+
+ event.connectTime = System.currentTimeMillis();
+ event.failed.set(false);
+
+ listener.onServiceAdd(event);
+ }
+ };
+ thread.setDaemon(true);
+ thread.start();
+ }
}
+
+ public long getBackOffMultiplier() {
+ return backOffMultiplier;
+ }
+
+ public void setBackOffMultiplier(long backOffMultiplier) {
+ this.backOffMultiplier = backOffMultiplier;
+ }
+
+ public long getInitialReconnectDelay() {
+ return initialReconnectDelay;
+ }
+
+ public void setInitialReconnectDelay(long initialReconnectDelay) {
+ this.initialReconnectDelay = initialReconnectDelay;
+ }
+
+ public int getMaxReconnectAttempts() {
+ return maxReconnectAttempts;
+ }
+
+ public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+ this.maxReconnectAttempts = maxReconnectAttempts;
+ }
+
+ public long getMaxReconnectDelay() {
+ return maxReconnectDelay;
+ }
+
+ public void setMaxReconnectDelay(long maxReconnectDelay) {
+ this.maxReconnectDelay = maxReconnectDelay;
+ }
+
+ public long getMinConnectTime() {
+ return minConnectTime;
+ }
+
+ public void setMinConnectTime(long minConnectTime) {
+ this.minConnectTime = minConnectTime;
+ }
+
+ public boolean isUseExponentialBackOff() {
+ return useExponentialBackOff;
+ }
+
+ public void setUseExponentialBackOff(boolean useExponentialBackOff) {
+ this.useExponentialBackOff = useExponentialBackOff;
+ }
}