You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/12 18:57:02 UTC
svn commit: r933322 -
/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
Author: dblevins
Date: Mon Apr 12 16:57:01 2010
New Revision: 933322
URL: http://svn.apache.org/viewvc?rev=933322&view=rev
Log:
Reworked as part of previous commit introducing a TCP-based alternative to multicast.
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=933322&r1=933321&r2=933322&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java Mon Apr 12 16:57:01 2010
@@ -35,17 +35,9 @@ import java.net.MulticastSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -63,52 +55,29 @@ public class MulticastDiscoveryAgent imp
private int timeToLive = 1;
private boolean loopbackMode = false;
private InetSocketAddress address;
-
- private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
-
- private String group = "default";
- private String groupPrefix = group + ":";
-
- private int maxMissedHeartbeats = 10;
private long heartRate = 500;
private Tracker tracker;
private Multicast multicast;
- public MulticastDiscoveryAgent() {
- tracker = new Tracker();
- }
-
- // ---------------------------------
- // Listenting specific settings
- private long reconnectDelay = 1000 * 5;
- private long maxReconnectDelay = 1000 * 30;
- private long exponentialBackoff = 0;
- private boolean useExponentialBackOff;
- private int maxReconnectAttempts = 10; // todo: check this out
- // ---------------------------------
-
-
- public void init(Properties props) throws Exception {
-
- host = props.getProperty("bind", host);
- group = props.getProperty("group", group);
- groupPrefix = group + ":";
+ public void init(Properties props) {
Options options = new Options(props);
-
+ host = props.getProperty("bind", host);
+ loopbackMode = options.get("loopback_mode", loopbackMode);
port = options.get("port", port);
-
heartRate = options.get("heart_rate", heartRate);
- maxMissedHeartbeats = options.get("max_missed_heartbeats", maxMissedHeartbeats);
- loopbackMode = options.get("loopback_mode", loopbackMode);
- reconnectDelay = options.get("reconnect_delay", reconnectDelay);
- maxReconnectDelay = options.get("max_reconnect_delay", reconnectDelay);
- maxReconnectAttempts = options.get("max_reconnect_attempts", maxReconnectAttempts);
- exponentialBackoff = options.get("exponential_backoff", exponentialBackoff);
- useExponentialBackOff = (exponentialBackoff > 1);
+ Tracker.Builder builder = new Tracker.Builder();
+ builder.setGroup(props.getProperty("group", builder.getGroup()));
+ builder.setMaxMissedHeartbeats(options.get("max_missed_heartbeats", builder.getMaxMissedHeartbeats()));
+ builder.setMaxReconnectDelay(options.get("max_reconnect_delay", builder.getMaxReconnectDelay()));
+ builder.setReconnectDelay(options.get("reconnect_delay", builder.getReconnectDelay()));
+ builder.setExponentialBackoff(options.get("exponential_backoff", builder.getExponentialBackoff()));
+ builder.setMaxReconnectAttempts(options.get("max_reconnect_attempts", builder.getMaxReconnectAttempts()));
+
+ tracker = builder.build();
}
public String getIP() {
@@ -128,30 +97,17 @@ public class MulticastDiscoveryAgent imp
}
public void registerService(URI serviceUri) throws IOException {
- Service service = new Service(serviceUri);
- this.registeredServices.put(service.broadcastString, service);
- this.tracker.fireServiceAddedEvent(serviceUri);
+ tracker.registerService(serviceUri);
}
public void unregisterService(URI serviceUri) throws IOException {
- Service service = new Service(serviceUri);
- this.registeredServices.remove(service.broadcastString);
- this.tracker.fireServiceRemovedEvent(serviceUri);
+ tracker.unregisterService(serviceUri);
}
- public void reportFailed(URI serviceUri) throws IOException {
+ public void reportFailed(URI serviceUri) {
tracker.reportFailed(serviceUri);
}
-
- private boolean isSelf(Service service) {
- return isSelf(service.broadcastString);
- }
-
- private boolean isSelf(String service) {
- return registeredServices.keySet().contains(service);
- }
-
public static void main(String[] args) throws Exception {
}
@@ -191,117 +147,6 @@ public class MulticastDiscoveryAgent imp
public void service(Socket socket) throws ServiceException, IOException {
}
- class Service {
- private final URI uri;
- private final String broadcastString;
-
- public Service(URI uri) {
- this.uri = uri;
- this.broadcastString = groupPrefix + uri.toString();
- }
-
- public Service(String uriString) throws URISyntaxException {
- URI uri = new URI(uriString);
- uri = new URI(uri.getSchemeSpecificPart());
- this.uri = uri;
- this.broadcastString = uriString;
- }
- }
-
- private class ServiceVitals {
-
- private final Service service;
-
- private long lastHeartBeat;
- private long recoveryTime;
- private int failureCount;
- private boolean dead;
-
- public ServiceVitals(Service service) {
- this.service = service;
- this.lastHeartBeat = System.currentTimeMillis();
- }
-
- public synchronized void heartbeat() {
- lastHeartBeat = System.currentTimeMillis();
-
- // Consider that the service recovery has succeeded if it has not
- // failed in 60 seconds.
- if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
- if (log.isDebugEnabled()) {
- log.debug("I now think that the " + service + " service has recovered.");
- }
- failureCount = 0;
- recoveryTime = 0;
- }
- }
-
- public synchronized long getLastHeartbeat() {
- return lastHeartBeat;
- }
-
- public synchronized boolean pronounceDead() {
- if (!dead) {
- dead = true;
- failureCount++;
-
- long delay;
- if (useExponentialBackOff) {
- delay = (long) Math.pow(exponentialBackoff, failureCount);
- if (delay > maxReconnectDelay) {
- delay = maxReconnectDelay;
- }
- } else {
- delay = reconnectDelay;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Remote failure of " + service + " while still receiving multicast advertisements. " +
- "Advertising events will be suppressed for " + delay
- + " ms, the current failure count is: " + failureCount);
- }
-
- recoveryTime = System.currentTimeMillis() + delay;
- return true;
- }
- return false;
- }
-
- /**
- * @return true if this broker is marked failed and it is now the right
- * time to start recovery.
- */
- public synchronized boolean doRecovery() {
- if (!dead) {
- return false;
- }
-
- // Are we done trying to recover this guy?
- if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
- if (log.isDebugEnabled()) {
- log.debug("Max reconnect attempts of the " + service + " service has been reached.");
- }
- return false;
- }
-
- // Is it not yet time?
- if (System.currentTimeMillis() < recoveryTime) {
- return false;
- }
-
- if (log.isDebugEnabled()) {
- log.debug("Resuming event advertisement of the " + service + " service.");
- }
- dead = false;
- return true;
- }
-
- public boolean isDead() {
- return dead;
- }
- }
-
-
class Multicast {
private static final int BUFF_SIZE = 8192;
@@ -371,7 +216,7 @@ public class MulticastDiscoveryAgent imp
}
private void heartbeat() {
- for (String uri : registeredServices.keySet()) {
+ for (String uri : tracker.getRegisteredServices()) {
try {
byte[] data = uri.getBytes();
DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
@@ -396,142 +241,6 @@ public class MulticastDiscoveryAgent imp
}
}
- class Tracker {
- private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
- private DiscoveryListener discoveryListener;
-
- public void setDiscoveryListener(DiscoveryListener discoveryListener) {
- this.discoveryListener = discoveryListener;
- }
-
- private void processData(String uriString) {
- if (discoveryListener == null) {
- return;
- }
-
- if (!uriString.startsWith(groupPrefix)){
- return;
- }
-
- if (isSelf(uriString)) {
- return;
- }
-
- ServiceVitals vitals = discoveredServices.get(uriString);
-
- if (vitals == null) {
- try {
- vitals = new ServiceVitals(new Service(uriString));
-
- discoveredServices.put(uriString, vitals);
-
- fireServiceAddedEvent(vitals.service.uri);
- } catch (URISyntaxException e) {
- // don't continuously log this
- }
-
- } else {
- vitals.heartbeat();
-
- if (vitals.doRecovery()) {
- fireServiceAddedEvent(vitals.service.uri);
- }
- }
- }
-
- private void checkServices() {
- long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
- for (ServiceVitals serviceVitals : discoveredServices.values()) {
- if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
-
- ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.broadcastString);
- if (vitals != null && !vitals.isDead()) {
- fireServiceRemovedEvent(vitals.service.uri);
- }
- }
- }
- }
-
- private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
- public Thread newThread(Runnable runable) {
- Thread t = new Thread(runable, "Discovery Agent Notifier");
- t.setDaemon(true);
- return t;
- }
- });
-
- private void fireServiceRemovedEvent(final URI uri) {
- if (discoveryListener != null) {
- final DiscoveryListener discoveryListener = this.discoveryListener;
-
- // Have the listener process the event async so that
- // he does not block this thread since we are doing time sensitive
- // processing of events.
- executor.execute(new Runnable() {
- public void run() {
- if (discoveryListener != null) {
- discoveryListener.serviceRemoved(uri);
- }
- }
- });
- }
- }
-
- private void fireServiceAddedEvent(final URI uri) {
- if (discoveryListener != null) {
- final DiscoveryListener discoveryListener = this.discoveryListener;
-
- // Have the listener process the event async so that
- // he does not block this thread since we are doing time sensitive
- // processing of events.
- executor.execute(new Runnable() {
- public void run() {
- if (discoveryListener != null) {
- discoveryListener.serviceAdded(uri);
- }
- }
- });
- }
- }
-
- public void reportFailed(URI serviceUri) {
- final Service service = new Service(serviceUri);
- ServiceVitals serviceVitals = discoveredServices.get(service.broadcastString);
- if (serviceVitals != null && serviceVitals.pronounceDead()) {
- fireServiceRemovedEvent(service.uri);
- }
- }
- }
-
- //
- // Ordinary getters/setters
- //
-
- public long getExponentialBackoff() {
- return exponentialBackoff;
- }
-
- public void setExponentialBackoff(long exponentialBackoff) {
- this.exponentialBackoff = exponentialBackoff;
- this.useExponentialBackOff = (exponentialBackoff > 1);
- }
-
- public String getGroup() {
- return group;
- }
-
- public void setGroup(String group) {
- this.group = group;
- groupPrefix = group + ":";
- }
-
- public long getHeartRate() {
- return heartRate;
- }
-
- public void setHeartRate(long heartRate) {
- this.heartRate = heartRate;
- }
public String getHost() {
return host;
@@ -541,14 +250,6 @@ public class MulticastDiscoveryAgent imp
this.host = host;
}
- public long getReconnectDelay() {
- return reconnectDelay;
- }
-
- public void setReconnectDelay(long reconnectDelay) {
- this.reconnectDelay = reconnectDelay;
- }
-
public boolean isLoopbackMode() {
return loopbackMode;
}
@@ -556,31 +257,6 @@ public class MulticastDiscoveryAgent imp
public void setLoopbackMode(boolean loopbackMode) {
this.loopbackMode = loopbackMode;
}
-
- public int getMaxMissedHeartbeats() {
- return maxMissedHeartbeats;
- }
-
- public void setMaxMissedHeartbeats(int maxMissedHeartbeats) {
- this.maxMissedHeartbeats = maxMissedHeartbeats;
- }
-
- public int getMaxReconnectAttempts() {
- return maxReconnectAttempts;
- }
-
- public void setMaxReconnectAttempts(int maxReconnectAttempts) {
- this.maxReconnectAttempts = maxReconnectAttempts;
- }
-
- public long getMaxReconnectDelay() {
- return maxReconnectDelay;
- }
-
- public void setMaxReconnectDelay(long maxReconnectDelay) {
- this.maxReconnectDelay = maxReconnectDelay;
- }
-
public int getTimeToLive() {
return timeToLive;
}