You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2012/03/08 19:59:00 UTC

svn commit: r1298511 - in /openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery: MultipointDiscoveryAgent.java MultipointServer.java

Author: dblevins
Date: Thu Mar  8 18:59:00 2012
New Revision: 1298511

URL: http://svn.apache.org/viewvc?rev=1298511&view=rev
Log:
OPENEJB-1794 - Multipoint Automatic Reconnect
OPENEJB-1793 - Multipoint.reconnectDelay configures how log to wait between attempts to rejoin the multipoint network
OPENEJB-1789 - Multipoint.discoveryHost allows for "bind" of 0.0.0.0


Modified:
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1298511&r1=1298510&r2=1298511&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Thu Mar  8 18:59:00 2012
@@ -21,6 +21,7 @@ import org.apache.openejb.server.ServerS
 import org.apache.openejb.server.ServiceException;
 import org.apache.openejb.server.DiscoveryAgent;
 import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.util.Duration;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 import org.apache.openejb.loader.Options;
@@ -31,8 +32,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.URI;
-import java.util.Map;
+import java.util.LinkedHashSet;
 import java.util.Properties;
+import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -57,6 +59,8 @@ public class MultipointDiscoveryAgent im
     private boolean debug = true;
     private String name;
     private String discoveryHost;
+    private Set<URI> roots;
+    private Duration reconnectDelay;
 
     public MultipointDiscoveryAgent() {
     }
@@ -68,7 +72,7 @@ public class MultipointDiscoveryAgent im
 
     public void init(Properties props) {
 
-        Options options = new Options(props);
+        final Options options = new Options(props);
         options.setLogger(new OptionsLog(log));
 
         host = props.getProperty("bind", host);
@@ -77,7 +81,18 @@ public class MultipointDiscoveryAgent im
         heartRate = options.get("heart_rate", heartRate);
         discoveryHost = options.get("discoveryHost", host);
         name = options.get("discoveryName", MultipointServer.randomColor());
+        reconnectDelay = options.get("reconnectDelay", new Duration("30 seconds"));
 
+        final Set<URI> uris = new LinkedHashSet<URI>();
+
+        // Connect the initial set of peer servers
+        StringTokenizer st = new StringTokenizer(initialServers, ",");
+        while (st.hasMoreTokens()) {
+            URI uri = URI.create("conn://" + st.nextToken().trim());
+            uris.add(uri);
+        }
+
+        roots = uris;
 
         Tracker.Builder builder = new Tracker.Builder();
         builder.setHeartRate(heartRate);
@@ -136,15 +151,9 @@ public class MultipointDiscoveryAgent im
         try {
             if (running.compareAndSet(false, true)) {
 
-                multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug).start();
+                multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug, roots, reconnectDelay).start();
 
                 this.port = multipointServer.getPort();
-                
-                // Connect the initial set of peer servers
-                StringTokenizer st = new StringTokenizer(initialServers, ",");
-                while (st.hasMoreTokens()) {
-                    multipointServer.connect(URI.create("conn://"+st.nextToken().trim()));
-                }
 
             }
         } catch (Exception e) {

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1298511&r1=1298510&r2=1298511&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Thu Mar  8 18:59:00 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.openejb.server.discovery;
 
+import org.apache.openejb.util.Duration;
 import org.apache.openejb.util.Join;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
@@ -37,9 +38,11 @@ import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -59,50 +62,62 @@ public class MultipointServer {
     private final int port;
     private final Selector selector;
     private final URI me;
+    private final Set<URI> roots = new LinkedHashSet<URI>();
 
     /**
      * Only used for toString to make debugging easier
      */
     private final String name;
 
-
     private final Tracker tracker;
 
     private final LinkedList<URI> connect = new LinkedList<URI>();
     private final Map<URI, Session> connections = new HashMap<URI, Session>();
     private boolean debug = true;
 
-    public MultipointServer(int port, Tracker tracker) throws IOException {
-        this("localhost", port, tracker);
-    }
+    private long joined = 0;
+    private long reconnectDelay;
 
-    public MultipointServer(String host, int port, Tracker tracker) throws IOException {
-        this(host, port, tracker, randomColor());
+    public MultipointServer(int port, Tracker tracker) throws IOException {
+        this("localhost", "localhost", port, tracker, randomColor(), true, Collections.EMPTY_SET, new Duration(30, TimeUnit.SECONDS));
     }
 
-    public MultipointServer(String host, int port, Tracker tracker, String name) throws IOException {
-        this(host, port, tracker, name, true);
-    }
+    public MultipointServer(String bindHost, String broadcastHost, int port, Tracker tracker, String name, boolean debug, Set<URI> roots, Duration reconnectDelay) throws IOException {
+        if (tracker == null) throw new NullPointerException("tracker cannot be null");
+        if (bindHost == null) throw new NullPointerException("host cannot be null");
 
-    public MultipointServer(String host, int port, Tracker tracker, String name, boolean debug) throws IOException {
-        this(host, host, port, tracker, name, debug);
-    }
+        if (broadcastHost == null) broadcastHost = bindHost;
+        if (reconnectDelay == null) reconnectDelay = new Duration(30, TimeUnit.SECONDS);
 
-    public MultipointServer(String bindHost, String broadcastHost, int port, Tracker tracker, String name, boolean debug) throws IOException {
-        if (tracker == null) throw new NullPointerException("tracker cannot be null");
         this.tracker = tracker;
         this.name = name;
         this.debug = debug;
-        String format = String.format("MultipointServer(bindHost=%s, discoveryHost=%s, port=%s, name=%s, debug=%s)", bindHost, broadcastHost, port, name, debug);
-        log.info(format);
-        ServerSocketChannel serverChannel = ServerSocketChannel.open();
+        if (roots != null) {
+            this.roots.addAll(roots);
+        }
 
-        ServerSocket serverSocket = serverChannel.socket();
-        InetSocketAddress address = new InetSocketAddress(bindHost, port);
-        serverSocket.bind(address);
+        this.reconnectDelay = reconnectDelay.getTime(TimeUnit.NANOSECONDS);
+
+        final String format = String.format("MultipointServer(bindHost=%s, discoveryHost=%s, port=%s, name=%s, debug=%s, roots=%s, reconnectDelay='%s')",
+                bindHost,
+                broadcastHost,
+                port,
+                name,
+                debug,
+                this.roots.size(),
+                reconnectDelay.toString());
+
+        log.debug(format);
+
+        final InetSocketAddress address = new InetSocketAddress(bindHost, port);
+
+        final ServerSocketChannel serverChannel = ServerSocketChannel.open();
         serverChannel.configureBlocking(false);
 
+        final ServerSocket serverSocket = serverChannel.socket();
+        serverSocket.bind(address);
         this.port = serverSocket.getLocalPort();
+
         if (name != null) {
             me = URI.create("conn://" + broadcastHost + ":" + this.port + "/" + name);
         } else {
@@ -120,6 +135,23 @@ public class MultipointServer {
         return port;
     }
 
+    /**
+     * Attempt to connect back to the network if
+     *  - We aren't already connected
+     *  - We aren't already attempting to connect
+     *  - It has been a while since we last tried (reconnectDelay)
+     */
+    private void rejoin() {
+        if (connections.size() > 0) return;
+        if (connect.size() > 0) return;
+        if (System.nanoTime() - joined <= reconnectDelay) return;
+
+        for (URI root : roots) {
+            connect(root);
+        }
+
+        this.joined = System.nanoTime();
+    }
     public MultipointServer start() {
         if (running.compareAndSet(false, true)) {
             Thread thread = new Thread(new Runnable() {
@@ -380,8 +412,14 @@ public class MultipointServer {
             // Here is where we actually will expire missing services
             tracker.checkServices();
 
+            // Fill 'connections' list if we are fully disconnected
+            rejoin();
+
+            // Connect to anyone in the 'connections' list
             initiateConnections();
 
+            // Adjust selector timeout so we execute in even increments
+            // This keeps the heartbeat and rejoin regular
             selectorTimeout = adjustedSelectorTimeout(start);
         }
     }
@@ -715,7 +753,7 @@ public class MultipointServer {
         connect(URI.create("conn://localhost:" + port));
     }
 
-    public void connect(URI uri) throws Exception {
+    public void connect(URI uri) {
         if (me.equals(uri)) return;
 
         synchronized (connect) {