You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2012/03/08 19:59:00 UTC
svn commit: r1298511 - in
/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery:
MultipointDiscoveryAgent.java MultipointServer.java
Author: dblevins
Date: Thu Mar 8 18:59:00 2012
New Revision: 1298511
URL: http://svn.apache.org/viewvc?rev=1298511&view=rev
Log:
OPENEJB-1794 - Multipoint Automatic Reconnect
OPENEJB-1793 - Multipoint.reconnectDelay configures how log to wait between attempts to rejoin the multipoint network
OPENEJB-1789 - Multipoint.discoveryHost allows for "bind" of 0.0.0.0
Modified:
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1298511&r1=1298510&r2=1298511&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Thu Mar 8 18:59:00 2012
@@ -21,6 +21,7 @@ import org.apache.openejb.server.ServerS
import org.apache.openejb.server.ServiceException;
import org.apache.openejb.server.DiscoveryAgent;
import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.util.Duration;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
import org.apache.openejb.loader.Options;
@@ -31,8 +32,9 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URI;
-import java.util.Map;
+import java.util.LinkedHashSet;
import java.util.Properties;
+import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -57,6 +59,8 @@ public class MultipointDiscoveryAgent im
private boolean debug = true;
private String name;
private String discoveryHost;
+ private Set<URI> roots;
+ private Duration reconnectDelay;
public MultipointDiscoveryAgent() {
}
@@ -68,7 +72,7 @@ public class MultipointDiscoveryAgent im
public void init(Properties props) {
- Options options = new Options(props);
+ final Options options = new Options(props);
options.setLogger(new OptionsLog(log));
host = props.getProperty("bind", host);
@@ -77,7 +81,18 @@ public class MultipointDiscoveryAgent im
heartRate = options.get("heart_rate", heartRate);
discoveryHost = options.get("discoveryHost", host);
name = options.get("discoveryName", MultipointServer.randomColor());
+ reconnectDelay = options.get("reconnectDelay", new Duration("30 seconds"));
+ final Set<URI> uris = new LinkedHashSet<URI>();
+
+ // Connect the initial set of peer servers
+ StringTokenizer st = new StringTokenizer(initialServers, ",");
+ while (st.hasMoreTokens()) {
+ URI uri = URI.create("conn://" + st.nextToken().trim());
+ uris.add(uri);
+ }
+
+ roots = uris;
Tracker.Builder builder = new Tracker.Builder();
builder.setHeartRate(heartRate);
@@ -136,15 +151,9 @@ public class MultipointDiscoveryAgent im
try {
if (running.compareAndSet(false, true)) {
- multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug).start();
+ multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug, roots, reconnectDelay).start();
this.port = multipointServer.getPort();
-
- // Connect the initial set of peer servers
- StringTokenizer st = new StringTokenizer(initialServers, ",");
- while (st.hasMoreTokens()) {
- multipointServer.connect(URI.create("conn://"+st.nextToken().trim()));
- }
}
} catch (Exception e) {
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1298511&r1=1298510&r2=1298511&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Thu Mar 8 18:59:00 2012
@@ -16,6 +16,7 @@
*/
package org.apache.openejb.server.discovery;
+import org.apache.openejb.util.Duration;
import org.apache.openejb.util.Join;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
@@ -37,9 +38,11 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -59,50 +62,62 @@ public class MultipointServer {
private final int port;
private final Selector selector;
private final URI me;
+ private final Set<URI> roots = new LinkedHashSet<URI>();
/**
* Only used for toString to make debugging easier
*/
private final String name;
-
private final Tracker tracker;
private final LinkedList<URI> connect = new LinkedList<URI>();
private final Map<URI, Session> connections = new HashMap<URI, Session>();
private boolean debug = true;
- public MultipointServer(int port, Tracker tracker) throws IOException {
- this("localhost", port, tracker);
- }
+ private long joined = 0;
+ private long reconnectDelay;
- public MultipointServer(String host, int port, Tracker tracker) throws IOException {
- this(host, port, tracker, randomColor());
+ public MultipointServer(int port, Tracker tracker) throws IOException {
+ this("localhost", "localhost", port, tracker, randomColor(), true, Collections.EMPTY_SET, new Duration(30, TimeUnit.SECONDS));
}
- public MultipointServer(String host, int port, Tracker tracker, String name) throws IOException {
- this(host, port, tracker, name, true);
- }
+ public MultipointServer(String bindHost, String broadcastHost, int port, Tracker tracker, String name, boolean debug, Set<URI> roots, Duration reconnectDelay) throws IOException {
+ if (tracker == null) throw new NullPointerException("tracker cannot be null");
+ if (bindHost == null) throw new NullPointerException("host cannot be null");
- public MultipointServer(String host, int port, Tracker tracker, String name, boolean debug) throws IOException {
- this(host, host, port, tracker, name, debug);
- }
+ if (broadcastHost == null) broadcastHost = bindHost;
+ if (reconnectDelay == null) reconnectDelay = new Duration(30, TimeUnit.SECONDS);
- public MultipointServer(String bindHost, String broadcastHost, int port, Tracker tracker, String name, boolean debug) throws IOException {
- if (tracker == null) throw new NullPointerException("tracker cannot be null");
this.tracker = tracker;
this.name = name;
this.debug = debug;
- String format = String.format("MultipointServer(bindHost=%s, discoveryHost=%s, port=%s, name=%s, debug=%s)", bindHost, broadcastHost, port, name, debug);
- log.info(format);
- ServerSocketChannel serverChannel = ServerSocketChannel.open();
+ if (roots != null) {
+ this.roots.addAll(roots);
+ }
- ServerSocket serverSocket = serverChannel.socket();
- InetSocketAddress address = new InetSocketAddress(bindHost, port);
- serverSocket.bind(address);
+ this.reconnectDelay = reconnectDelay.getTime(TimeUnit.NANOSECONDS);
+
+ final String format = String.format("MultipointServer(bindHost=%s, discoveryHost=%s, port=%s, name=%s, debug=%s, roots=%s, reconnectDelay='%s')",
+ bindHost,
+ broadcastHost,
+ port,
+ name,
+ debug,
+ this.roots.size(),
+ reconnectDelay.toString());
+
+ log.debug(format);
+
+ final InetSocketAddress address = new InetSocketAddress(bindHost, port);
+
+ final ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
+ final ServerSocket serverSocket = serverChannel.socket();
+ serverSocket.bind(address);
this.port = serverSocket.getLocalPort();
+
if (name != null) {
me = URI.create("conn://" + broadcastHost + ":" + this.port + "/" + name);
} else {
@@ -120,6 +135,23 @@ public class MultipointServer {
return port;
}
+ /**
+ * Attempt to connect back to the network if
+ * - We aren't already connected
+ * - We aren't already attempting to connect
+ * - It has been a while since we last tried (reconnectDelay)
+ */
+ private void rejoin() {
+ if (connections.size() > 0) return;
+ if (connect.size() > 0) return;
+ if (System.nanoTime() - joined <= reconnectDelay) return;
+
+ for (URI root : roots) {
+ connect(root);
+ }
+
+ this.joined = System.nanoTime();
+ }
public MultipointServer start() {
if (running.compareAndSet(false, true)) {
Thread thread = new Thread(new Runnable() {
@@ -380,8 +412,14 @@ public class MultipointServer {
// Here is where we actually will expire missing services
tracker.checkServices();
+ // Fill 'connections' list if we are fully disconnected
+ rejoin();
+
+ // Connect to anyone in the 'connections' list
initiateConnections();
+ // Adjust selector timeout so we execute in even increments
+ // This keeps the heartbeat and rejoin regular
selectorTimeout = adjustedSelectorTimeout(start);
}
}
@@ -715,7 +753,7 @@ public class MultipointServer {
connect(URI.create("conn://localhost:" + port));
}
- public void connect(URI uri) throws Exception {
+ public void connect(URI uri) {
if (me.equals(uri)) return;
synchronized (connect) {