You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2012/11/11 14:04:56 UTC

svn commit: r1407968 - in /openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main: java/org/apache/openejb/server/discovery/ resources/META-INF/org.apache.openejb.server.ServerService/

Author: dblevins
Date: Sun Nov 11 13:04:55 2012
New Revision: 1407968

URL: http://svn.apache.org/viewvc?rev=1407968&view=rev
Log:
OPENEJB-1794 Multipoint Automatic Reconnect

Modified:
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Sun Nov 11 13:04:55 2012
@@ -172,7 +172,7 @@ public class MultipointDiscoveryAgent im
         try {
             if (running.compareAndSet(false, true)) {
                 log.info("MultipointDiscoveryAgent Starting");
-                final boolean broadcast = options.get("broadcast", true);
+                final boolean broadcast = options.get("broadcast", false);
                 multipointServer = new MultipointServer(host, discoveryHost, port, tracker, name, debug, roots, reconnectDelay, broadcast).start();
                 log.info("MultipointDiscoveryAgent Started");
 

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Sun Nov 11 13:04:55 2012
@@ -27,10 +27,13 @@ import org.apache.openejb.util.Logger;
 import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.URI;
+import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
@@ -51,7 +54,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -86,7 +95,7 @@ public class MultipointServer {
 
     private final Tracker tracker;
 
-    private final LinkedList<URI> connect = new LinkedList<URI>();
+    private final LinkedList<Host> connect = new LinkedList<Host>();
     private final Map<URI, Session> connections = new HashMap<URI, Session>();
 
     private long joined = 0;
@@ -100,7 +109,7 @@ public class MultipointServer {
     private final Lock lock = new ReentrantLock();
     private final Condition started = lock.newCondition();
     private final Condition stopped = lock.newCondition();
-    private final AtomicBoolean broadcast = new AtomicBoolean(true);
+    private final AtomicBoolean broadcast = new AtomicBoolean(false);
 
     public MultipointServer(int port, Tracker tracker) throws IOException {
         this("localhost", "localhost", port, tracker, randomColor(), true, Collections.EMPTY_SET, new Duration(30, TimeUnit.SECONDS), true);
@@ -198,7 +207,14 @@ public class MultipointServer {
     }
 
     public List<URI> getConnectionsQueued() {
-        return new ArrayList<URI>(connect);
+        synchronized (connect) {
+            final ArrayList<URI> uris = new ArrayList<URI>(connect.size());
+            for (Host host : connect) {
+                uris.add(host.getUri());
+            }
+            return uris;
+        }
+
     }
 
     public long getReconnectDelay() {
@@ -229,10 +245,12 @@ public class MultipointServer {
         if (System.nanoTime() - joined <= reconnectDelay) return;
 
         for (URI uri : roots) {
+            final Host host = new Host(uri);
             synchronized (connect) {
-                if (!connections.containsKey(uri) && !connect.contains(uri)) {
+                if (!connections.containsKey(uri) && !connect.contains(host)) {
                     log.info("Reconnect{uri=" + uri + "}");
-                    connect.addLast(uri);
+                    connect.addLast(host);
+                    host.resolveDns();
                     this.joined = System.nanoTime();
                 }
             }
@@ -582,6 +600,7 @@ public class MultipointServer {
             // This keeps the heartbeat and rejoin regular
             selectorTimeout = adjustedSelectorTimeout(start);
         }
+        log.info("MultipointServer has terminated.");
     }
 
     private long adjustedSelectorTimeout(long start) {
@@ -593,25 +612,46 @@ public class MultipointServer {
     }
 
     private void initiateConnections() {
+
         synchronized (connect) {
+            final LinkedList<Host> unresolved = new LinkedList<Host>();
+
             while (connect.size() > 0) {
 
-                final URI uri = connect.removeFirst();
+                final Host host = connect.removeFirst();
 
-                if (connections.containsKey(uri)) continue;
+                log.debug("Initiate(uri=" + host.getUri() + ")");
 
-                final int port = uri.getPort();
-                final String host = uri.getHost();
+                if (connections.containsKey(host.getUri())) continue;
+
+                if (!host.isDone()) {
+                    unresolved.add(host);
+                    log.debug("Unresolved(uri=" + host.getUri() + ")");
+                    continue;
+                }
+
+                final InetSocketAddress address;
+                try {
+                    address = host.getSocketAddress();
+                } catch (ExecutionException e) {
+                    final Throwable t = (e.getCause() != null) ? e.getCause() : e;
+                    final String message = String.format("Failed Connect{uri=%s} %s{message=\"%s\"}", host.getUri(), t.getClass().getSimpleName(), t.getMessage());
+                    log.warning(message);
+                    continue;
+                } catch (TimeoutException e) {
+                    unresolved.add(host);
+                    log.debug("Unresolved(uri=" + host.getUri() + ")");
+                    continue;
+                }
 
                 try {
+                    final URI uri = host.getUri();
                     println("open " + uri);
 
                     // Create a non-blocking NIO channel
                     final SocketChannel socketChannel = SocketChannel.open();
                     socketChannel.configureBlocking(false);
 
-                    final InetSocketAddress address = new InetSocketAddress(host, port);
-
                     socketChannel.connect(address);
 
                     final Session session = new Session(socketChannel, address, uri);
@@ -625,6 +665,8 @@ public class MultipointServer {
                     throw new ServerRuntimeException(e);
                 }
             }
+
+            connect.addAll(unresolved);
         }
     }
 
@@ -869,7 +911,9 @@ public class MultipointServer {
     private ArrayList<URI> connections() {
         synchronized (connect) {
             final ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
-            list.addAll(connect);
+            for (Host host : connect) {
+                list.add(host.getUri());
+            }
             return list;
         }
     }
@@ -919,10 +963,13 @@ public class MultipointServer {
         uri = normalize(uri);
         if (me.equals(uri)) return;
 
+        final Host host = new Host(uri);
+
         synchronized (connect) {
-            if (!connections.containsKey(uri) && !connect.contains(uri)) {
-                log.debug("Queuing{uri=" + uri + "}");
-                connect.addLast(uri);
+            if (!connections.containsKey(uri) && !connect.contains(host)) {
+                log.info("Queuing{uri=" + uri + "}");
+                connect.addLast(host);
+                host.resolveDns();
             }
         }
     }
@@ -1150,4 +1197,58 @@ public class MultipointServer {
 
         return s;
     }
+
+    private final Executor dnsResolutionQueue = Executors.newFixedThreadPool(2);
+
+    private class Host {
+        private final URI uri;
+        private final FutureTask<InetAddress> address;
+
+        private Host(URI uri) {
+            this.uri = uri;
+            this.address = new FutureTask<InetAddress>(new Callable<InetAddress>(){
+                public InetAddress call() throws Exception {
+                    return InetAddress.getByName(Host.this.uri.getHost());
+                }
+            });
+        }
+
+        public void resolveDns() {
+            dnsResolutionQueue.execute(address);
+        }
+
+        public boolean isDone() {
+            return address.isDone();
+        }
+
+        public InetSocketAddress getSocketAddress() throws ExecutionException, TimeoutException {
+            try {
+                final InetAddress inetAddress = address.get(0, TimeUnit.NANOSECONDS);
+                return new InetSocketAddress(inetAddress, uri.getPort());
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                throw new TimeoutException();
+            }
+        }
+
+        public URI getUri() {
+            return uri;
+        }
+
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            Host host = (Host) o;
+
+            return uri.equals(host.uri);
+        }
+
+        @Override
+        public int hashCode() {
+            return uri.hashCode();
+        }
+    }
 }

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint?rev=1407968&r1=1407967&r2=1407968&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/resources/META-INF/org.apache.openejb.server.ServerService/multipoint Sun Nov 11 13:04:55 2012
@@ -8,4 +8,4 @@ group                  = default
 heart_rate             = 500
 loopback_mode          = false
 max_missed_heartbeats  = 10
-broadcast              = true
+broadcast              = false