You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/12 18:57:02 UTC

svn commit: r933322 - /openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java

Author: dblevins
Date: Mon Apr 12 16:57:01 2010
New Revision: 933322

URL: http://svn.apache.org/viewvc?rev=933322&view=rev
Log:
Reworked as part of previous commit introducing a TCP-based alternative to multicast.

Modified:
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java?rev=933322&r1=933321&r2=933322&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java Mon Apr 12 16:57:01 2010
@@ -35,17 +35,9 @@ import java.net.MulticastSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
 import java.util.Properties;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -63,52 +55,29 @@ public class MulticastDiscoveryAgent imp
     private int timeToLive = 1;
     private boolean loopbackMode = false;
     private InetSocketAddress address;
-
-    private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
-
-    private String group = "default";
-    private String groupPrefix = group + ":";
-
-    private int maxMissedHeartbeats = 10;
     private long heartRate = 500;
 
     private Tracker tracker;
     private Multicast multicast;
 
-    public MulticastDiscoveryAgent() {
-        tracker = new Tracker();
-    }
-
-    // ---------------------------------
-    // Listenting specific settings
-    private long reconnectDelay = 1000 * 5;
-    private long maxReconnectDelay = 1000 * 30;
-    private long exponentialBackoff = 0;
-    private boolean useExponentialBackOff;
-    private int maxReconnectAttempts = 10; // todo: check this out
-    // ---------------------------------
-
-
-    public void init(Properties props) throws Exception {
-
-        host = props.getProperty("bind", host);
-        group = props.getProperty("group", group);
-        groupPrefix = group + ":";
+    public void init(Properties props) {
 
         Options options = new Options(props);
-
+        host = props.getProperty("bind", host);
+        loopbackMode = options.get("loopback_mode", loopbackMode);
         port = options.get("port", port);
-
         heartRate = options.get("heart_rate", heartRate);
-        maxMissedHeartbeats = options.get("max_missed_heartbeats", maxMissedHeartbeats);
-        loopbackMode = options.get("loopback_mode", loopbackMode);
 
-        reconnectDelay = options.get("reconnect_delay", reconnectDelay);
-        maxReconnectDelay = options.get("max_reconnect_delay", reconnectDelay);
-        maxReconnectAttempts = options.get("max_reconnect_attempts", maxReconnectAttempts);
-        exponentialBackoff = options.get("exponential_backoff", exponentialBackoff);
 
-        useExponentialBackOff = (exponentialBackoff > 1);
+        Tracker.Builder builder = new Tracker.Builder();
+        builder.setGroup(props.getProperty("group", builder.getGroup()));
+        builder.setMaxMissedHeartbeats(options.get("max_missed_heartbeats", builder.getMaxMissedHeartbeats()));
+        builder.setMaxReconnectDelay(options.get("max_reconnect_delay", builder.getMaxReconnectDelay()));
+        builder.setReconnectDelay(options.get("reconnect_delay", builder.getReconnectDelay()));
+        builder.setExponentialBackoff(options.get("exponential_backoff", builder.getExponentialBackoff()));
+        builder.setMaxReconnectAttempts(options.get("max_reconnect_attempts", builder.getMaxReconnectAttempts()));
+
+        tracker = builder.build();
     }
 
     public String getIP() {
@@ -128,30 +97,17 @@ public class MulticastDiscoveryAgent imp
     }
 
     public void registerService(URI serviceUri) throws IOException {
-        Service service = new Service(serviceUri);
-        this.registeredServices.put(service.broadcastString, service);
-        this.tracker.fireServiceAddedEvent(serviceUri);
+        tracker.registerService(serviceUri);
     }
 
     public void unregisterService(URI serviceUri) throws IOException {
-        Service service = new Service(serviceUri);
-        this.registeredServices.remove(service.broadcastString);
-        this.tracker.fireServiceRemovedEvent(serviceUri);
+        tracker.unregisterService(serviceUri);
     }
 
-    public void reportFailed(URI serviceUri) throws IOException {
+    public void reportFailed(URI serviceUri) {
         tracker.reportFailed(serviceUri);
     }
 
-
-    private boolean isSelf(Service service) {
-        return isSelf(service.broadcastString);
-    }
-
-    private boolean isSelf(String service) {
-        return registeredServices.keySet().contains(service);
-    }
-
     public static void main(String[] args) throws Exception {
     }
 
@@ -191,117 +147,6 @@ public class MulticastDiscoveryAgent imp
     public void service(Socket socket) throws ServiceException, IOException {
     }
 
-    class Service {
-        private final URI uri;
-        private final String broadcastString;
-
-        public Service(URI uri) {
-            this.uri = uri;
-            this.broadcastString = groupPrefix + uri.toString();
-        }
-
-        public Service(String uriString) throws URISyntaxException {
-            URI uri = new URI(uriString);
-            uri = new URI(uri.getSchemeSpecificPart());
-            this.uri = uri;
-            this.broadcastString = uriString;
-        }
-    }
-
-    private class ServiceVitals {
-
-        private final Service service;
-
-        private long lastHeartBeat;
-        private long recoveryTime;
-        private int failureCount;
-        private boolean dead;
-
-        public ServiceVitals(Service service) {
-            this.service = service;
-            this.lastHeartBeat = System.currentTimeMillis();
-        }
-
-        public synchronized void heartbeat() {
-            lastHeartBeat = System.currentTimeMillis();
-
-            // Consider that the service recovery has succeeded if it has not
-            // failed in 60 seconds.
-            if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
-                if (log.isDebugEnabled()) {
-                    log.debug("I now think that the " + service + " service has recovered.");
-                }
-                failureCount = 0;
-                recoveryTime = 0;
-            }
-        }
-
-        public synchronized long getLastHeartbeat() {
-            return lastHeartBeat;
-        }
-
-        public synchronized boolean pronounceDead() {
-            if (!dead) {
-                dead = true;
-                failureCount++;
-
-                long delay;
-                if (useExponentialBackOff) {
-                    delay = (long) Math.pow(exponentialBackoff, failureCount);
-                    if (delay > maxReconnectDelay) {
-                        delay = maxReconnectDelay;
-                    }
-                } else {
-                    delay = reconnectDelay;
-                }
-
-                if (log.isDebugEnabled()) {
-                    log.debug("Remote failure of " + service + " while still receiving multicast advertisements.  " +
-                            "Advertising events will be suppressed for " + delay
-                            + " ms, the current failure count is: " + failureCount);
-                }
-
-                recoveryTime = System.currentTimeMillis() + delay;
-                return true;
-            }
-            return false;
-        }
-
-        /**
-         * @return true if this broker is marked failed and it is now the right
-         *         time to start recovery.
-         */
-        public synchronized boolean doRecovery() {
-            if (!dead) {
-                return false;
-            }
-
-            // Are we done trying to recover this guy?
-            if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
-                if (log.isDebugEnabled()) {
-                    log.debug("Max reconnect attempts of the " + service + " service has been reached.");
-                }
-                return false;
-            }
-
-            // Is it not yet time?
-            if (System.currentTimeMillis() < recoveryTime) {
-                return false;
-            }
-
-            if (log.isDebugEnabled()) {
-                log.debug("Resuming event advertisement of the " + service + " service.");
-            }
-            dead = false;
-            return true;
-        }
-
-        public boolean isDead() {
-            return dead;
-        }
-    }
-
-
     class Multicast {
 
         private static final int BUFF_SIZE = 8192;
@@ -371,7 +216,7 @@ public class MulticastDiscoveryAgent imp
             }
 
             private void heartbeat() {
-                for (String uri : registeredServices.keySet()) {
+                for (String uri : tracker.getRegisteredServices()) {
                     try {
                         byte[] data = uri.getBytes();
                         DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
@@ -396,142 +241,6 @@ public class MulticastDiscoveryAgent imp
         }
     }
 
-    class Tracker {
-        private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
-        private DiscoveryListener discoveryListener;
-
-        public void setDiscoveryListener(DiscoveryListener discoveryListener) {
-            this.discoveryListener = discoveryListener;
-        }
-
-        private void processData(String uriString) {
-            if (discoveryListener == null) {
-                return;
-            }
-
-            if (!uriString.startsWith(groupPrefix)){
-                return;
-            }
-
-            if (isSelf(uriString)) {
-                return;
-            }
-
-            ServiceVitals vitals = discoveredServices.get(uriString);
-
-            if (vitals == null) {
-                try {
-                    vitals = new ServiceVitals(new Service(uriString));
-
-                    discoveredServices.put(uriString, vitals);
-
-                    fireServiceAddedEvent(vitals.service.uri);
-                } catch (URISyntaxException e) {
-                    // don't continuously log this
-                }
-
-            } else {
-                vitals.heartbeat();
-
-                if (vitals.doRecovery()) {
-                    fireServiceAddedEvent(vitals.service.uri);
-                }
-            }
-        }
-
-        private void checkServices() {
-            long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
-            for (ServiceVitals serviceVitals : discoveredServices.values()) {
-                if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
-
-                    ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.broadcastString);
-                    if (vitals != null && !vitals.isDead()) {
-                        fireServiceRemovedEvent(vitals.service.uri);
-                    }
-                }
-            }
-        }
-
-        private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
-            public Thread newThread(Runnable runable) {
-                Thread t = new Thread(runable, "Discovery Agent Notifier");
-                t.setDaemon(true);
-                return t;
-            }
-        });
-
-        private void fireServiceRemovedEvent(final URI uri) {
-            if (discoveryListener != null) {
-                final DiscoveryListener discoveryListener = this.discoveryListener;
-
-                // Have the listener process the event async so that
-                // he does not block this thread since we are doing time sensitive
-                // processing of events.
-                executor.execute(new Runnable() {
-                    public void run() {
-                        if (discoveryListener != null) {
-                            discoveryListener.serviceRemoved(uri);
-                        }
-                    }
-                });
-            }
-        }
-
-        private void fireServiceAddedEvent(final URI uri) {
-            if (discoveryListener != null) {
-                final DiscoveryListener discoveryListener = this.discoveryListener;
-
-                // Have the listener process the event async so that
-                // he does not block this thread since we are doing time sensitive
-                // processing of events.
-                executor.execute(new Runnable() {
-                    public void run() {
-                        if (discoveryListener != null) {
-                            discoveryListener.serviceAdded(uri);
-                        }
-                    }
-                });
-            }
-        }
-
-        public void reportFailed(URI serviceUri) {
-            final Service service = new Service(serviceUri);
-            ServiceVitals serviceVitals = discoveredServices.get(service.broadcastString);
-            if (serviceVitals != null && serviceVitals.pronounceDead()) {
-                fireServiceRemovedEvent(service.uri);
-            }
-        }
-    }
-
-    //
-    //  Ordinary getters/setters
-    //
-
-    public long getExponentialBackoff() {
-        return exponentialBackoff;
-    }
-
-    public void setExponentialBackoff(long exponentialBackoff) {
-        this.exponentialBackoff = exponentialBackoff;
-        this.useExponentialBackOff = (exponentialBackoff > 1);
-    }
-
-    public String getGroup() {
-        return group;
-    }
-
-    public void setGroup(String group) {
-        this.group = group;
-        groupPrefix = group + ":";
-    }
-
-    public long getHeartRate() {
-        return heartRate;
-    }
-
-    public void setHeartRate(long heartRate) {
-        this.heartRate = heartRate;
-    }
 
     public String getHost() {
         return host;
@@ -541,14 +250,6 @@ public class MulticastDiscoveryAgent imp
         this.host = host;
     }
 
-    public long getReconnectDelay() {
-        return reconnectDelay;
-    }
-
-    public void setReconnectDelay(long reconnectDelay) {
-        this.reconnectDelay = reconnectDelay;
-    }
-
     public boolean isLoopbackMode() {
         return loopbackMode;
     }
@@ -556,31 +257,6 @@ public class MulticastDiscoveryAgent imp
     public void setLoopbackMode(boolean loopbackMode) {
         this.loopbackMode = loopbackMode;
     }
-
-    public int getMaxMissedHeartbeats() {
-        return maxMissedHeartbeats;
-    }
-
-    public void setMaxMissedHeartbeats(int maxMissedHeartbeats) {
-        this.maxMissedHeartbeats = maxMissedHeartbeats;
-    }
-
-    public int getMaxReconnectAttempts() {
-        return maxReconnectAttempts;
-    }
-
-    public void setMaxReconnectAttempts(int maxReconnectAttempts) {
-        this.maxReconnectAttempts = maxReconnectAttempts;
-    }
-
-    public long getMaxReconnectDelay() {
-        return maxReconnectDelay;
-    }
-
-    public void setMaxReconnectDelay(long maxReconnectDelay) {
-        this.maxReconnectDelay = maxReconnectDelay;
-    }
-
     public int getTimeToLive() {
         return timeToLive;
     }