You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2012/11/11 14:04:56 UTC
svn commit: r1407968 - in
/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main:
java/org/apache/openejb/server/discovery/
resources/META-INF/org.apache.openejb.server.ServerService/
Author: dblevins
Date: Sun Nov 11 13:04:55 2012
New Revision: 1407968
URL: http://svn.apache.org/viewvc?rev=1407968&view=rev
Log:
OPENEJB-1794 Multipoint Automatic Reconnect
Modified:
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Sun Nov 11 13:04:55 2012
@@ -172,7 +172,7 @@ public class MultipointDiscoveryAgent im
try {
if (running.compareAndSet(false, true)) {
log.info("MultipointDiscoveryAgent Starting");
- final boolean broadcast = options.get("broadcast", true);
+ final boolean broadcast = options.get("broadcast", false);
multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug, roots, reconnectDelay, broadcast).start();
log.info("MultipointDiscoveryAgent Started");
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Sun Nov 11 13:04:55 2012
@@ -27,10 +27,13 @@ import org.apache.openejb.util.Logger;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.URI;
+import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
@@ -51,7 +54,13 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -86,7 +95,7 @@ public class MultipointServer {
private final Tracker tracker;
- private final LinkedList<URI> connect = new LinkedList<URI>();
+ private final LinkedList<Host> connect = new LinkedList<Host>();
private final Map<URI, Session> connections = new HashMap<URI, Session>();
private long joined = 0;
@@ -100,7 +109,7 @@ public class MultipointServer {
private final Lock lock = new ReentrantLock();
private final Condition started = lock.newCondition();
private final Condition stopped = lock.newCondition();
- private final AtomicBoolean broadcast = new AtomicBoolean(true);
+ private final AtomicBoolean broadcast = new AtomicBoolean(false);
public MultipointServer(int port, Tracker tracker) throws IOException {
this("localhost", "localhost", port, tracker, randomColor(), true, Collections.EMPTY_SET, new Duration(30, TimeUnit.SECONDS), true);
@@ -198,7 +207,14 @@ public class MultipointServer {
}
public List<URI> getConnectionsQueued() {
- return new ArrayList<URI>(connect);
+ synchronized (connect) {
+ final ArrayList<URI> uris = new ArrayList<URI>(connect.size());
+ for (Host host : connect) {
+ uris.add(host.getUri());
+ }
+ return uris;
+ }
+
}
public long getReconnectDelay() {
@@ -229,10 +245,12 @@ public class MultipointServer {
if (System.nanoTime() - joined <= reconnectDelay) return;
for (URI uri : roots) {
+ final Host host = new Host(uri);
synchronized (connect) {
- if (!connections.containsKey(uri) && !connect.contains(uri)) {
+ if (!connections.containsKey(uri) && !connect.contains(host)) {
log.info("Reconnect{uri=" + uri + "}");
- connect.addLast(uri);
+ connect.addLast(host);
+ host.resolveDns();
this.joined = System.nanoTime();
}
}
@@ -582,6 +600,7 @@ public class MultipointServer {
// This keeps the heartbeat and rejoin regular
selectorTimeout = adjustedSelectorTimeout(start);
}
+ log.info("MultipointServer has terminated.");
}
private long adjustedSelectorTimeout(long start) {
@@ -593,25 +612,46 @@ public class MultipointServer {
}
private void initiateConnections() {
+
synchronized (connect) {
+ final LinkedList<Host> unresolved = new LinkedList<Host>();
+
while (connect.size() > 0) {
- final URI uri = connect.removeFirst();
+ final Host host = connect.removeFirst();
- if (connections.containsKey(uri)) continue;
+ log.debug("Initiate(uri=" + host.getUri() + ")");
- final int port = uri.getPort();
- final String host = uri.getHost();
+ if (connections.containsKey(host.getUri())) continue;
+
+ if (!host.isDone()) {
+ unresolved.add(host);
+ log.debug("Unresolved(uri=" + host.getUri() + ")");
+ continue;
+ }
+
+ final InetSocketAddress address;
+ try {
+ address = host.getSocketAddress();
+ } catch (ExecutionException e) {
+ final Throwable t = (e.getCause() != null) ? e.getCause() : e;
+ final String message = String.format("Failed Connect{uri=%s} %s{message=\"%s\"}", host.getUri(), t.getClass().getSimpleName(), t.getMessage());
+ log.warning(message);
+ continue;
+ } catch (TimeoutException e) {
+ unresolved.add(host);
+ log.debug("Unresolved(uri=" + host.getUri() + ")");
+ continue;
+ }
try {
+ final URI uri = host.getUri();
println("open " + uri);
// Create a non-blocking NIO channel
final SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
- final InetSocketAddress address = new InetSocketAddress(host, port);
-
socketChannel.connect(address);
final Session session = new Session(socketChannel, address, uri);
@@ -625,6 +665,8 @@ public class MultipointServer {
throw new ServerRuntimeException(e);
}
}
+
+ connect.addAll(unresolved);
}
}
@@ -869,7 +911,9 @@ public class MultipointServer {
private ArrayList<URI> connections() {
synchronized (connect) {
final ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
- list.addAll(connect);
+ for (Host host : connect) {
+ list.add(host.getUri());
+ }
return list;
}
}
@@ -919,10 +963,13 @@ public class MultipointServer {
uri = normalize(uri);
if (me.equals(uri)) return;
+ final Host host = new Host(uri);
+
synchronized (connect) {
- if (!connections.containsKey(uri) && !connect.contains(uri)) {
- log.debug("Queuing{uri=" + uri + "}");
- connect.addLast(uri);
+ if (!connections.containsKey(uri) && !connect.contains(host)) {
+ log.info("Queuing{uri=" + uri + "}");
+ connect.addLast(host);
+ host.resolveDns();
}
}
}
@@ -1150,4 +1197,58 @@ public class MultipointServer {
return s;
}
+
+ private final Executor dnsResolutionQueue = Executors.newFixedThreadPool(2);
+
+ private class Host {
+ private final URI uri;
+ private final FutureTask<InetAddress> address;
+
+ private Host(URI uri) {
+ this.uri = uri;
+ this.address = new FutureTask<InetAddress>(new Callable<InetAddress>(){
+ public InetAddress call() throws Exception {
+ return InetAddress.getByName(Host.this.uri.getHost());
+ }
+ });
+ }
+
+ public void resolveDns() {
+ dnsResolutionQueue.execute(address);
+ }
+
+ public boolean isDone() {
+ return address.isDone();
+ }
+
+ public InetSocketAddress getSocketAddress() throws ExecutionException, TimeoutException {
+ try {
+ final InetAddress inetAddress = address.get(0, TimeUnit.NANOSECONDS);
+ return new InetSocketAddress(inetAddress, uri.getPort());
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new TimeoutException();
+ }
+ }
+
+ public URI getUri() {
+ return uri;
+ }
+
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Host host = (Host) o;
+
+ return uri.equals(host.uri);
+ }
+
+ @Override
+ public int hashCode() {
+ return uri.hashCode();
+ }
+ }
}
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint Sun Nov 11 13:04:55 2012
@@ -8,4 +8,4 @@ group = default
heart_rate = 500
loopback_mode = false
max_missed_heartbeats = 10
-broadcast = true
+broadcast = false