You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/09 01:58:57 UTC

svn commit: r932180 - in /openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery: MultipointDiscoveryAgent.java Tracker.java

Author: dblevins
Date: Thu Apr  8 23:58:57 2010
New Revision: 932180

URL: http://svn.apache.org/viewvc?rev=932180&view=rev
Log:
More refactoring to try and isolate the multicast specific bits.  Aiming to something usable for both UDP and TCP "heartbeating"

Added:
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
      - copied, changed from r915514, openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java   (with props)

Copied: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (from r915514, openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java)
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?p2=openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java&p1=openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java&r1=915514&r2=932180&rev=932180&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MulticastDiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Thu Apr  8 23:58:57 2010
@@ -50,11 +50,13 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * NIO-based, Asynchronous, Muiltipoint (peer-to-peer) web of nodes that broadcast a heartbeat
+ *
  * @version $Rev$ $Date$
  */
-public class MulticastDiscoveryAgent implements DiscoveryAgent, ServerService, SelfManaging {
+public class MultipointDiscoveryAgent implements DiscoveryAgent, ServerService, SelfManaging {
 
-    private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MulticastDiscoveryAgent.class);
+    private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointDiscoveryAgent.class);
 
     private static final int BUFF_SIZE = 8192;
 
@@ -79,7 +81,7 @@ public class MulticastDiscoveryAgent imp
 
     private Listener listener;
 
-    public MulticastDiscoveryAgent() {
+    public MultipointDiscoveryAgent() {
         listener = new Listener();
     }
 
@@ -571,4 +573,4 @@ public class MulticastDiscoveryAgent imp
         this.timeToLive = timeToLive;
     }
 
-}
+}
\ No newline at end of file

Added: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=932180&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (added)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Thu Apr  8 23:58:57 2010
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.util.Logger;
+import org.apache.openejb.util.LogCategory;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.io.IOException;
+
+/**
+ * @version $Rev$ $Date$
+*/
+public class Tracker {
+    
+    private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), Tracker.class);
+    
+    private final String group;
+    private final String groupPrefix;
+    private final long heartRate;
+    private final int maxMissedHeartbeats;
+    private final long reconnectDelay;
+    private final long maxReconnectDelay;
+    private final int maxReconnectAttempts;
+    private final long exponentialBackoff;
+    private final boolean useExponentialBackOff;
+
+
+    public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff) {
+        this.group = group;
+        this.groupPrefix = group + ":";
+
+        this.heartRate = heartRate;
+        this.maxMissedHeartbeats = maxMissedHeartbeats;
+        this.reconnectDelay = reconnectDelay;
+        this.maxReconnectDelay = maxReconnectDelay;
+        this.maxReconnectAttempts = maxReconnectAttempts;
+        this.exponentialBackoff = exponentialBackoff;
+        this.useExponentialBackOff = exponentialBackoff > 1;
+    }
+
+    private Map<String, Service> registeredServices = new ConcurrentHashMap<String, Service>();
+
+    private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
+    private DiscoveryListener discoveryListener;
+
+    public void setDiscoveryListener(DiscoveryListener discoveryListener) {
+        this.discoveryListener = discoveryListener;
+    }
+
+    public Set<String> getRegisteredServices() {
+        return registeredServices.keySet();
+    }
+
+    public void registerService(URI serviceUri) throws IOException {
+        Service service = new Service(serviceUri);
+        this.registeredServices.put(service.broadcastString, service);
+        fireServiceAddedEvent(serviceUri);
+    }
+
+    public void unregisterService(URI serviceUri) throws IOException {
+        Service service = new Service(serviceUri);
+        this.registeredServices.remove(service.broadcastString);
+        fireServiceRemovedEvent(serviceUri);
+    }
+
+    private boolean isSelf(Service service) {
+        return isSelf(service.broadcastString);
+    }
+
+    private boolean isSelf(String service) {
+        return registeredServices.keySet().contains(service);
+    }
+
+    public void processData(String uriString) {
+        if (discoveryListener == null) {
+            return;
+        }
+
+        if (!uriString.startsWith(groupPrefix)){
+            return;
+        }
+
+        if (isSelf(uriString)) {
+            return;
+        }
+
+        ServiceVitals vitals = discoveredServices.get(uriString);
+
+        if (vitals == null) {
+            try {
+                vitals = new ServiceVitals(new Service(uriString));
+
+                discoveredServices.put(uriString, vitals);
+
+                fireServiceAddedEvent(vitals.service.uri);
+            } catch (URISyntaxException e) {
+                // don't continuously log this
+            }
+
+        } else {
+            vitals.heartbeat();
+
+            if (vitals.doRecovery()) {
+                fireServiceAddedEvent(vitals.service.uri);
+            }
+        }
+    }
+
+    public void checkServices() {
+        long expireTime = System.currentTimeMillis() - (heartRate * maxMissedHeartbeats);
+        for (ServiceVitals serviceVitals : discoveredServices.values()) {
+            if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
+
+                ServiceVitals vitals = discoveredServices.remove(serviceVitals.service.broadcastString);
+                if (vitals != null && !vitals.isDead()) {
+                    fireServiceRemovedEvent(vitals.service.uri);
+                }
+            }
+        }
+    }
+
+    private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+        public Thread newThread(Runnable runable) {
+            Thread t = new Thread(runable, "Discovery Agent Notifier");
+            t.setDaemon(true);
+            return t;
+        }
+    });
+
+    private void fireServiceRemovedEvent(final URI uri) {
+        if (discoveryListener != null) {
+            final DiscoveryListener discoveryListener = this.discoveryListener;
+
+            // Have the listener process the event async so that
+            // he does not block this thread since we are doing time sensitive
+            // processing of events.
+            executor.execute(new Runnable() {
+                public void run() {
+                    if (discoveryListener != null) {
+                        discoveryListener.serviceRemoved(uri);
+                    }
+                }
+            });
+        }
+    }
+
+    private void fireServiceAddedEvent(final URI uri) {
+        if (discoveryListener != null) {
+            final DiscoveryListener discoveryListener = this.discoveryListener;
+
+            // Have the listener process the event async so that
+            // he does not block this thread since we are doing time sensitive
+            // processing of events.
+            executor.execute(new Runnable() {
+                public void run() {
+                    if (discoveryListener != null) {
+                        discoveryListener.serviceAdded(uri);
+                    }
+                }
+            });
+        }
+    }
+
+    public void reportFailed(URI serviceUri) {
+        final Service service = new Service(serviceUri);
+        ServiceVitals serviceVitals = discoveredServices.get(service.broadcastString);
+        if (serviceVitals != null && serviceVitals.pronounceDead()) {
+            fireServiceRemovedEvent(service.uri);
+        }
+    }
+
+    public class Service {
+        private final URI uri;
+        private final String broadcastString;
+
+        public Service(URI uri) {
+            this.uri = uri;
+            this.broadcastString = groupPrefix + uri.toString();
+        }
+
+        public Service(String uriString) throws URISyntaxException {
+            URI uri = new URI(uriString);
+            uri = new URI(uri.getSchemeSpecificPart());
+            this.uri = uri;
+            this.broadcastString = uriString;
+        }
+    }
+
+    private class ServiceVitals {
+
+        private final Service service;
+
+        private long lastHeartBeat;
+        private long recoveryTime;
+        private int failureCount;
+        private boolean dead;
+
+        public ServiceVitals(Service service) {
+            this.service = service;
+            this.lastHeartBeat = System.currentTimeMillis();
+        }
+
+        public synchronized void heartbeat() {
+            lastHeartBeat = System.currentTimeMillis();
+
+            // Consider that the service recovery has succeeded if it has not
+            // failed in 60 seconds.
+            if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
+                if (log.isDebugEnabled()) {
+                    log.debug("I now think that the " + service + " service has recovered.");
+                }
+                failureCount = 0;
+                recoveryTime = 0;
+            }
+        }
+
+        public synchronized long getLastHeartbeat() {
+            return lastHeartBeat;
+        }
+
+        public synchronized boolean pronounceDead() {
+            if (!dead) {
+                dead = true;
+                failureCount++;
+
+                long delay;
+                if (useExponentialBackOff) {
+                    delay = (long) Math.pow(exponentialBackoff, failureCount);
+                    if (delay > maxReconnectDelay) {
+                        delay = maxReconnectDelay;
+                    }
+                } else {
+                    delay = reconnectDelay;
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Remote failure of " + service + " while still receiving multicast advertisements.  " +
+                            "Advertising events will be suppressed for " + delay
+                            + " ms, the current failure count is: " + failureCount);
+                }
+
+                recoveryTime = System.currentTimeMillis() + delay;
+                return true;
+            }
+            return false;
+        }
+
+        /**
+         * @return true if this broker is marked failed and it is now the right
+         *         time to start recovery.
+         */
+        public synchronized boolean doRecovery() {
+            if (!dead) {
+                return false;
+            }
+
+            // Are we done trying to recover this guy?
+            if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Max reconnect attempts of the " + service + " service has been reached.");
+                }
+                return false;
+            }
+
+            // Is it not yet time?
+            if (System.currentTimeMillis() < recoveryTime) {
+                return false;
+            }
+
+            if (log.isDebugEnabled()) {
+                log.debug("Resuming event advertisement of the " + service + " service.");
+            }
+            dead = false;
+            return true;
+        }
+
+        public boolean isDead() {
+            return dead;
+        }
+    }
+
+
+    public static class Builder {
+        private String group = "default";
+        private int maxMissedHeartbeats = 10;
+        private long heartRate = 500;
+        // ---------------------------------
+        // Listenting specific settings
+        private long reconnectDelay = 1000 * 5;
+        private long maxReconnectDelay = 1000 * 30;
+        private long exponentialBackoff = 0;
+        private int maxReconnectAttempts = 10; // todo: check this out
+        // ---------------------------------
+
+
+        public long getExponentialBackoff() {
+            return exponentialBackoff;
+        }
+
+        public void setExponentialBackoff(long exponentialBackoff) {
+            this.exponentialBackoff = exponentialBackoff;
+        }
+
+        public String getGroup() {
+            return group;
+        }
+
+        public void setGroup(String group) {
+            this.group = group;
+        }
+
+        public long getHeartRate() {
+            return heartRate;
+        }
+
+        public void setHeartRate(long heartRate) {
+            this.heartRate = heartRate;
+        }
+
+        public long getReconnectDelay() {
+            return reconnectDelay;
+        }
+
+        public void setReconnectDelay(long reconnectDelay) {
+            this.reconnectDelay = reconnectDelay;
+        }
+
+        public int getMaxMissedHeartbeats() {
+            return maxMissedHeartbeats;
+        }
+
+        public void setMaxMissedHeartbeats(int maxMissedHeartbeats) {
+            this.maxMissedHeartbeats = maxMissedHeartbeats;
+        }
+
+        public int getMaxReconnectAttempts() {
+            return maxReconnectAttempts;
+        }
+
+        public void setMaxReconnectAttempts(int maxReconnectAttempts) {
+            this.maxReconnectAttempts = maxReconnectAttempts;
+        }
+
+        public long getMaxReconnectDelay() {
+            return maxReconnectDelay;
+        }
+
+        public void setMaxReconnectDelay(long maxReconnectDelay) {
+            this.maxReconnectDelay = maxReconnectDelay;
+        }
+
+        public Tracker build() {
+            return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff);
+        }
+    }
+
+}

Propchange: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
------------------------------------------------------------------------------
    svn:eol-style = native