You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2011/12/21 23:48:20 UTC

svn commit: r1221925 - in /openejb/branches/openejb-3.1.x: ./ container/openejb-core/src/test/java/org/apache/openejb/config/ examples/alternate-descriptors/src/main/resources/META-INF/ server/openejb-multicast/src/main/java/org/apache/openejb/server/d...

Author: dblevins
Date: Wed Dec 21 22:48:19 2011
New Revision: 1221925

URL: http://svn.apache.org/viewvc?rev=1221925&view=rev
Log:
svn merge -r 1221923:1221924 https://svn.apache.org/repos/asf/openejb/trunk/openejb

http://svn.apache.org/viewvc?view=revision&revision=1221924
------------------------------------------------------------------------
r1221924 | dblevins | 2011-12-21 14:46:48 -0800 (Wed, 21 Dec 2011) | 3 lines

OPENEJB-1729: Reliability of Multipoint remove event when last peer disappears
OPENEJB-1730: Reliability of multipoint discovery heartrates less than 1 second

------------------------------------------------------------------------

Modified:
    openejb/branches/openejb-3.1.x/   (props changed)
    openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java   (props changed)
    openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml   (props changed)
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
    openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java

Propchange: openejb/branches/openejb-3.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1:779593
-/openejb/trunk/openejb:1182222
+/openejb/trunk/openejb:1182222,1221924
 /openejb/trunk/openejb3:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528

Propchange: openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593
-/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222
+/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222,1221924
 /openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528

Propchange: openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593
-/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222
+/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222,1221924
 /openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1029528

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Wed Dec 21 22:48:19 2011
@@ -53,6 +53,16 @@ public class MultipointDiscoveryAgent im
 
     private Tracker tracker;
     private MultipointServer multipointServer;
+    private boolean debug;
+    private String name;
+
+    public MultipointDiscoveryAgent() {
+    }
+
+    public MultipointDiscoveryAgent(boolean debug, String name) {
+        this.debug = debug;
+        this.name = name;
+    }
 
     public void init(Properties props) {
 
@@ -73,6 +83,7 @@ public class MultipointDiscoveryAgent im
         builder.setReconnectDelay(options.get("reconnect_delay", builder.getReconnectDelay()));
         builder.setExponentialBackoff(options.get("exponential_backoff", builder.getExponentialBackoff()));
         builder.setMaxReconnectAttempts(options.get("max_reconnect_attempts", builder.getMaxReconnectAttempts()));
+        builder.setDebug(debug);
 
         tracker = builder.build();
     }
@@ -121,7 +132,7 @@ public class MultipointDiscoveryAgent im
         try {
             if (running.compareAndSet(false, true)) {
 
-                multipointServer = new MultipointServer(host, port, tracker).start();
+                multipointServer = new MultipointServer(host, port, tracker, name, debug).start();
 
                 this.port = multipointServer.getPort();
                 

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Wed Dec 21 22:48:19 2011
@@ -16,6 +16,7 @@
  */
 package org.apache.openejb.server.discovery;
 
+import org.apache.openejb.util.Join;
 import org.apache.openejb.util.LogCategory;
 import org.apache.openejb.util.Logger;
 
@@ -42,7 +43,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -52,30 +55,46 @@ public class MultipointServer {
     private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class);
 
     private static final URI END_LIST = URI.create("end:list");
-    
+
     private final String host;
     private final int port;
     private final Selector selector;
     private final URI me;
 
+    /**
+     * Only used for toString to make debugging easier
+     */
+    private final String name;
+
+
     private final Tracker tracker;
 
     private final LinkedList<URI> connect = new LinkedList<URI>();
     private final Map<URI, Session> connections = new HashMap<URI, Session>();
+    private boolean debug = false;
 
     public MultipointServer(int port, Tracker tracker) throws IOException {
         this("localhost", port, tracker);
     }
 
     public MultipointServer(String host, int port, Tracker tracker) throws IOException {
+        this(host, port, tracker, randomColor());
+    }
+
+    public MultipointServer(String host, int port, Tracker tracker, String name) throws IOException {
+        this(host, port, tracker, name, false);
+    }
+
+    public MultipointServer(String host, int port, Tracker tracker, String name, boolean debug) throws IOException {
         if (tracker == null) throw new NullPointerException("tracker cannot be null");
         this.host = host;
         this.tracker = tracker;
-
+        this.name = name;
+        this.debug = debug;
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
 
         ServerSocket serverSocket = serverChannel.socket();
-        InetSocketAddress address = new InetSocketAddress(host,port);
+        InetSocketAddress address = new InetSocketAddress(host, port);
         serverSocket.bind(address);
         serverChannel.configureBlocking(false);
 
@@ -100,7 +119,7 @@ public class MultipointServer {
                     _run();
                 }
             });
-            thread.setName("Server." + port);
+            thread.setName(Join.join(".", "MultipointServer", name, port));
             thread.start();
         }
         return this;
@@ -150,7 +169,7 @@ public class MultipointServer {
         private void trace(String str) {
 //            println(message(str));
 
-            if (log.isDebugEnabled()) {
+            if (debug && log.isDebugEnabled()) {
                 log.debug(message(str));
             }
         }
@@ -164,7 +183,9 @@ public class MultipointServer {
         }
 
         private String message(String str) {
-            StringBuilder sb = new StringBuilder();
+            final StringBuilder sb = new StringBuilder();
+            sb.append(name);
+            sb.append(":");
             sb.append(port);
             sb.append(" ");
             if (key.isValid()) {
@@ -189,11 +210,11 @@ public class MultipointServer {
         }
 
         public void write(Collection<?> uris) throws IOException {
-            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
             for (Object uri : uris) {
-                String s = uri.toString();
-                byte[] b = s.getBytes("UTF-8");
+                final String s = uri.toString();
+                final byte[] b = s.getBytes("UTF-8");
                 baos.write(b);
                 baos.write(EOF);
             }
@@ -210,16 +231,16 @@ public class MultipointServer {
 
             if (channel.read(read) == -1) throw new EOFException();
 
-            byte[] buf = read.array();
+            final byte[] buf = read.array();
 
-            int end = endOfText(buf, 0, read.position());
+            final int end = endOfText(buf, 0, read.position());
 
             if (end < 0) return null;
 
             // Copy the string without the terminator char
-            String text = new String(buf, 0, end, "UTF-8");
+            final String text = new String(buf, 0, end, "UTF-8");
 
-            int newPos = read.position() - end;
+            final int newPos = read.position() - end;
             System.arraycopy(buf, end + 1, buf, 0, newPos - 1);
             read.position(newPos - 1);
 
@@ -248,10 +269,10 @@ public class MultipointServer {
         public void tick() throws IOException {
             if (state != State.HEARTBEAT) return;
 
-            long now = System.currentTimeMillis();
-            long delay = now - last;
+            final long now = System.currentTimeMillis();
+            final long delay = now - last;
 
-            if (delay > tracker.getHeartRate()) {
+            if (delay >= tracker.getHeartRate()) {
                 last = now;
                 heartbeat();
             }
@@ -266,8 +287,6 @@ public class MultipointServer {
             }
             write(strings);
             state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT);
-
-            tracker.checkServices();
         }
     }
 
@@ -278,342 +297,379 @@ public class MultipointServer {
     private final AtomicBoolean running = new AtomicBoolean();
 
     private void _run() {
+
+        // The selectorTimeout ensures that even when there are no IO events,
+        // this loop will "wake up" and execute at least as frequently as the
+        // expected heartrate.
+        //
+        // We initiate WRITE events (the heartbeats we send) in this loop, so that
+        // detail is critical.
+        long selectorTimeout = tracker.getHeartRate();
+
+        // For reliability purposes we will actually adjust the selectorTimeout
+        // on each iteration of the loop, shrinking it down just a little to a
+        // account for the execution time of the loop itself.
+
         while (running.get()) {
+
+            final long start = System.nanoTime();
+
             try {
-                selector.select(1000);
+                selector.select(selectorTimeout);
             } catch (IOException ex) {
                 ex.printStackTrace();
                 break;
             }
 
-            Set keys = selector.selectedKeys();
-
-            Iterator iterator = keys.iterator();
+            final Set keys = selector.selectedKeys();
+            final Iterator iterator = keys.iterator();
             while (iterator.hasNext()) {
-                SelectionKey key = (SelectionKey) iterator.next();
+                final SelectionKey key = (SelectionKey) iterator.next();
                 iterator.remove();
 
                 try {
-                    if (key.isAcceptable()) {
-
-                        // we are a server
+                    if (key.isAcceptable()) doAccept(key);
 
-                        // when you are a server, we must first listen for the
-                        // address of the client before sending data.
+                    if (key.isConnectable()) doConnect(key);
 
-                        // once they send us their address, we will send our
-                        // full list of known addresses, followed by the "end"
-                        // address to signal that we are done.
+                    if (key.isReadable()) doRead(key);
 
-                        // Afterward we will only pulls our heartbeat
+                    if (key.isWritable()) doWrite(key);
 
-                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
-                        SocketChannel client = server.accept();
-                        InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
+                } catch (CancelledKeyException ex) {
+                    synchronized (connect) {
+                        final Session session = (Session) key.attachment();
+                        if (session.state != State.CLOSED) {
+                            close(key);
+                        }
+                    }
+                } catch (ClosedChannelException ex) {
+                    synchronized (connect) {
+                        final Session session = (Session) key.attachment();
+                        if (session.state != State.CLOSED) {
+                            close(key);
+                        }
+                    }
+                } catch (IOException ex) {
+                    final Session session = (Session) key.attachment();
+                    session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage());
+                    close(key);
+                }
 
-                        client.configureBlocking(false);
+            }
 
-                        Session session = new Session(client, address, null);
-                        session.trace("accept");
-                        session.state(java.nio.channels.SelectionKey.OP_READ, State.GREETING);
-                    }
+            // This loop can generate WRITE keys (the heartbeats we send)
+            for (SelectionKey key : selector.keys()) {
+                final Session session = (Session) key.attachment();
 
-                    if (key.isConnectable()) {
+                try {
+                    if (session != null && session.state == State.HEARTBEAT) session.tick();
+                } catch (IOException ex) {
+                    close(key);
+                }
+            }
 
-                        // we are a client
+            // Here is where we actually will expire missing services
+            tracker.checkServices();
 
-                        Session session = (Session) key.attachment();
-                        session.channel.finishConnect();
-                        session.trace("connected");
+            initiateConnections();
 
-                        // when you are a client, first say high to everyone
-                        // before accepting data
+            selectorTimeout = adjustedSelectorTimeout(start);
+        }
+    }
 
-                        // once a server reads our address, it will send it's
-                        // full list of known addresses, followed by the "end"
-                        // address to signal that it is done.
+    private long adjustedSelectorTimeout(long start) {
+        final long end = System.nanoTime();
+        final long elapsed = TimeUnit.NANOSECONDS.toMillis(end - start);
+        final long heartRate = tracker.getHeartRate();
 
-                        // we will then send our full list of known addresses,
-                        // followed by the "end" address to signal we are done.
+        return Math.max(1, heartRate - elapsed);
+    }
 
-                        // Afterward the server will only pulls its heartbeat
+    private void initiateConnections() {
+        synchronized (connect) {
+            while (connect.size() > 0) {
 
-                        // separately, we will initiate connections to everyone
-                        // in the list who we have not yet seen.
+                final URI uri = connect.removeFirst();
 
-                        // WRITE our GREETING
-                        session.write(me);
+                if (connections.containsKey(uri)) continue;
 
-                        session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING);
-                    }
+                final int port = uri.getPort();
+                final String host = uri.getHost();
 
-                    if (key.isReadable()) {
+                try {
+                    println("open " + uri);
 
+                    // Create a non-blocking NIO channel
+                    final SocketChannel socketChannel = SocketChannel.open();
+                    socketChannel.configureBlocking(false);
 
-                        Session session = (Session) key.attachment();
+                    final InetSocketAddress address = new InetSocketAddress(host, port);
 
-                        switch (session.state) {
-                            case GREETING: { // read
+                    socketChannel.connect(address);
 
-                                // This state is only reachable as a SERVER
-                                // The client connected and said hello by sending
-                                // its URI to let us know who they are
+                    final Session session = new Session(socketChannel, address, uri);
+                    session.ops(SelectionKey.OP_CONNECT);
+                    session.trace("client");
+                    connections.put(session.uri, session);
 
-                                // Once this is read, the client will expect us
-                                // to send our full list of URIs followed by the
-                                // "end" address.
+                    // seen - needs to get maintained as "connected"
+                    // TODO remove from seen
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
 
-                                // So we switch to WRITE LISTING and they switch
-                                // to READ LISTING
+    private void doWrite(SelectionKey key) throws IOException {
+        final Session session = (Session) key.attachment();
 
-                                // Then we will switch to READ LISTING and they
-                                // will switch to WRITE LISTING
-                                
-                                String message = session.read();
+        switch (session.state) {
+            case GREETING: { // write
 
-                                if (message == null) break; // need to read more
+                // Only CLIENTs write a GREETING message
+                // As we are a client, the first thing we do
+                // is READ the server's LIST
 
-                                session.setURI(URI.create(message));
+                if (session.drain()) {
+                    session.state(SelectionKey.OP_READ, State.LISTING);
+                }
 
-                                connected(session);
+            }
+            break;
 
-                                session.trace("welcome");
+            case LISTING: { // write
 
-                                ArrayList<URI> list = connections();
+                if (session.drain()) {
 
-                                // When they read themselves on the list
-                                // they'll know it's time to list their URIs
+                    if (session.client) {
+                        // CLIENTs list last, so at this point we've read
+                        // the server's list and have written ours
 
-                                list.remove(me); // yank
-                                list.add(END_LIST); // add to the end
+                        session.trace("DONE WRITING");
 
-                                session.write(list);
+                        session.state(SelectionKey.OP_READ, State.HEARTBEAT);
 
-                                session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+                    } else {
+                        // SERVERs always write their list first, so at this
+                        // point we switch to LIST READ mode
 
-                                session.trace("STARTING");
+                        session.state(SelectionKey.OP_READ, State.LISTING);
 
+                    }
+                }
+            }
+            break;
 
-                            }
-                            break;
+            case HEARTBEAT: { // write
 
-                            case LISTING: { // read
+                if (session.drain()) {
 
-                                String message = null;
+                    session.last = System.currentTimeMillis();
 
-                                while ((message = session.read()) != null) {
+                    session.trace("send");
 
-                                    session.trace(message);
+                    session.state(SelectionKey.OP_READ, State.HEARTBEAT);
 
-                                    URI uri = URI.create(message);
+                }
 
-                                    if (END_LIST.equals(uri)) {
+            }
+            break;
+        }
+    }
 
-                                        if (session.client) {
+    private void doRead(SelectionKey key) throws IOException {
+        final Session session = (Session) key.attachment();
 
-                                            ArrayList<URI> list = connections();
+        switch (session.state) {
+            case GREETING: { // read
 
-                                            for (URI reported : session.listed) {
-                                                list.remove(reported);
-                                            }
+                // This state is only reachable as a SERVER
+                // The client connected and said hello by sending
+                // its URI to let us know who they are
 
-                                            // When they read us on the list
-                                            // they'll know it's time to switch to heartbeat
+                // Once this is read, the client will expect us
+                // to send our full list of URIs followed by the
+                // "end" address.
 
-                                            list.remove(session.uri);
-                                            list.add(END_LIST);
-                                            
-                                            session.write(list);
+                // So we switch to WRITE LISTING and they switch
+                // to READ LISTING
 
-                                            session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+                // Then we will switch to READ LISTING and they
+                // will switch to WRITE LISTING
 
-                                        } else {
+                final String message = session.read();
 
-                                            // We are a SERVER in this relationship, so we will have already
-                                            // listed our known peers by this point.  From here we switch to
-                                            // heartbeating
-                                            
-                                            // heartbeat time
-                                            if (session.hangup) {
-                                                session.state(0, State.CLOSED);
-                                                session.trace("hangup");
-                                                hangup(key);
+                if (message == null) break; // need to read more
 
-                                            } else {
+                session.setURI(URI.create(message));
 
-                                                session.trace("DONE READING");
+                connected(session);
 
-                                                session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+                session.trace("welcome");
 
-                                            }
+                final ArrayList<URI> list = connections();
 
-                                        }
+                // When they read themselves on the list
+                // they'll know it's time to list their URIs
 
-                                        break;
+                list.remove(me); // yank
+                list.add(END_LIST); // add to the end
 
-                                    } else {
+                session.write(list);
 
-                                        session.listed.add(uri);
+                session.state(SelectionKey.OP_WRITE, State.LISTING);
 
-                                        try {
-                                            connect(uri);
-                                        } catch (Exception e) {
-                                            println("connect failed " + uri + " - " + e.getMessage());
-                                            e.printStackTrace();
-                                        }
-                                    }
-                                }
+                session.trace("STARTING");
 
-                            }
-                            break;
 
-                            case HEARTBEAT: { // read
+            }
+            break;
 
-                                String message = null;
-                                while ((message = session.read()) != null) {
-                                    tracker.processData(message);
-                                }
-                            }
-                            break;
-                        }
+            case LISTING: { // read
 
-                    }
+                String message = null;
 
-                    if (key.isWritable()) {
+                while ((message = session.read()) != null) {
 
-                        Session session = (Session) key.attachment();
+                    session.trace(message);
 
-                        switch (session.state) {
-                            case GREETING: { // write
+                    final URI uri = URI.create(message);
 
-                                // Only CLIENTs write a GREETING message
-                                // As we are a client, the first thing we do
-                                // is READ the server's LIST
-                                
-                                if (session.drain()) {
-                                    session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
-                                }
+                    if (END_LIST.equals(uri)) {
 
-                            }
-                            break;
+                        if (session.client) {
 
-                            case LISTING: { // write
+                            final ArrayList<URI> list = connections();
 
-                                if (session.drain()) {
+                            for (URI reported : session.listed) {
+                                list.remove(reported);
+                            }
 
-                                    if (session.client) {
-                                        // CLIENTs list last, so at this point we've read
-                                        // the server's list and have written ours
-                                       
-                                        session.trace("DONE WRITING");
+                            // When they read us on the list
+                            // they'll know it's time to switch to heartbeat
 
-                                        session.state(SelectionKey.OP_READ, State.HEARTBEAT);
-                                        
-                                    } else {
-                                        // SERVERs always write their list first, so at this
-                                        // point we switch to LIST READ mode
+                            list.remove(session.uri);
+                            list.add(END_LIST);
 
-                                        session.state(SelectionKey.OP_READ, State.LISTING);
+                            session.write(list);
 
-                                    }
-                                }
-                            }
-                            break;
+                            session.state(SelectionKey.OP_WRITE, State.LISTING);
 
-                            case HEARTBEAT: { // write
+                        } else {
 
-                                if (session.drain()) {
+                            // We are a SERVER in this relationship, so we will have already
+                            // listed our known peers by this point.  From here we switch to
+                            // heartbeating
 
-                                    session.last = System.currentTimeMillis();
+                            // heartbeat time
+                            if (session.hangup) {
+                                session.state(0, State.CLOSED);
+                                session.trace("hangup");
+                                hangup(key);
 
-                                    session.trace("ping");
+                            } else {
 
-                                    session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+                                session.trace("DONE READING");
 
-                                }
+                                session.state(SelectionKey.OP_READ, State.HEARTBEAT);
 
                             }
-                            break;
-                        }
-                    }
 
-                } catch (CancelledKeyException ex) {
-                    synchronized (connect) {
-                        Session session = (Session) key.attachment();
-                        if (session.state != State.CLOSED) {
-                            close(key);
                         }
-                    }
-                } catch (ClosedChannelException ex) {
-                    synchronized (connect) {
-                        Session session = (Session) key.attachment();
-                        if (session.state != State.CLOSED) {
-                            close(key);        
+
+                        break;
+
+                    } else {
+
+                        session.listed.add(uri);
+
+                        try {
+                            connect(uri);
+                        } catch (Exception e) {
+                            println("connect failed " + uri + " - " + e.getMessage());
+                            e.printStackTrace();
                         }
                     }
-                } catch (IOException ex) {
-                    Session session = (Session) key.attachment();
-                    session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage());
-                    close(key);
                 }
 
             }
+            break;
 
-            for (SelectionKey key : selector.keys()) {
-                Session session = (Session) key.attachment();
+            case HEARTBEAT: { // read
 
-                try {
-                    if (session != null && session.state == State.HEARTBEAT) session.tick();
-                } catch (IOException ex) {
-                    close(key);
+                String message = null;
+                while ((message = session.read()) != null) {
+                    session.trace(message);
+                    tracker.processData(message);
                 }
             }
+            break;
+        }
+    }
 
-            synchronized (connect) {
-                while (connect.size() > 0) {
+    private void doConnect(SelectionKey key) throws IOException {
+        // we are a client
 
-                    URI uri = connect.removeFirst();
+        final Session session = (Session) key.attachment();
+        session.channel.finishConnect();
+        session.trace("connected");
 
-                    if (connections.containsKey(uri)) continue;
+        // when you are a client, first say high to everyone
+        // before accepting data
 
-                    int port = uri.getPort();
-                    String host = uri.getHost();
+        // once a server reads our address, it will send it's
+        // full list of known addresses, followed by the "end"
+        // address to signal that it is done.
 
-                    try {
-                        println("open " + uri);
+        // we will then send our full list of known addresses,
+        // followed by the "end" address to signal we are done.
 
-                        SocketChannel socketChannel = SocketChannel.open();
-                        socketChannel.configureBlocking(false);
+        // Afterward the server will only pulls its heartbeat
 
-                        InetSocketAddress address = new InetSocketAddress(host, port);
+        // separately, we will initiate connections to everyone
+        // in the list who we have not yet seen.
 
-                        socketChannel.connect(address);
+        // WRITE our GREETING
+        session.write(me);
 
-                        Session session = new Session(socketChannel, address, uri);
-                        session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
-                        session.trace("client");
-                        connections.put(session.uri, session);
-                        
-                        // seen - needs to get maintained as "connected"
-                        // TODO remove from seen
-                    } catch (IOException e) {
-                        log.warning("Error connecting to " + host + ":" + port, e);
-                    }
-                }
-            }
-        }
+        session.state(SelectionKey.OP_WRITE, State.GREETING);
+    }
+
+    private void doAccept(SelectionKey key) throws IOException {
+        // we are a server
+
+        // when you are a server, we must first listen for the
+        // address of the client before sending data.
+
+        // once they send us their address, we will send our
+        // full list of known addresses, followed by the "end"
+        // address to signal that we are done.
+
+        // Afterward we will only pulls our heartbeat
+
+        final ServerSocketChannel server = (ServerSocketChannel) key.channel();
+        final SocketChannel client = server.accept();
+        final InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
+
+        client.configureBlocking(false);
+
+        final Session session = new Session(client, address, null);
+        session.trace("accept");
+        session.state(SelectionKey.OP_READ, State.GREETING);
     }
 
     private ArrayList<URI> connections() {
         synchronized (connect) {
-            ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
+            final ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
             list.addAll(connect);
             return list;
         }
     }
 
     private void close(SelectionKey key) {
-        Session session = (Session) key.attachment();
+        final Session session = (Session) key.attachment();
 
         session.state(0, State.CLOSED);
 
@@ -675,7 +731,7 @@ public class MultipointServer {
                 // by us.  We will both have detected this situation
                 // and know it needs fixing.  Only one of us can hangup
 
-                Session[] sessions = {session, duplicate};
+                final Session[] sessions = {session, duplicate};
                 Arrays.sort(sessions, new Comparator<Session>() {
                     // Goal: Keep the connection with the lowest port number
                     ///
@@ -700,13 +756,13 @@ public class MultipointServer {
                     }
 
                     private int server(Session a) {
-                        Socket socket = a.channel.socket();
-                        return a.client? socket.getPort(): socket.getLocalPort();
+                        final Socket socket = a.channel.socket();
+                        return a.client ? socket.getPort() : socket.getLocalPort();
                     }
 
                     private int client(Session a) {
-                        Socket socket = a.channel.socket();
-                        return !a.client? socket.getPort(): socket.getLocalPort();
+                        final Socket socket = a.channel.socket();
+                        return !a.client ? socket.getPort() : socket.getLocalPort();
                     }
                 });
 
@@ -724,8 +780,139 @@ public class MultipointServer {
     }
 
     private void println(String s) {
-//        if (s.matches(".*(Listening|DONE|KEEP|KILL)")) {
+//        if (debug && s.matches(".*(Listening|DONE|KEEP|KILL)")) {
 //            System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
 //        }
     }
+
+    @Override
+    public String toString() {
+        return "MultipointServer{" +
+                "name='" + name + '\'' +
+                ", me=" + me +
+                '}';
+    }
+
+    private static String randomColor() {
+        String[] colors = {
+                "almond",
+                "amber",
+                "amethyst",
+                "apple",
+                "apricot",
+                "aqua",
+                "aquamarine",
+                "ash",
+                "azure",
+                "banana",
+                "beige",
+                "black",
+                "blue",
+                "brick",
+                "bronze",
+                "brown",
+                "burgundy",
+                "carrot",
+                "charcoal",
+                "cherry",
+                "chestnut",
+                "chocolate",
+                "chrome",
+                "cinnamon",
+                "citrine",
+                "cobalt",
+                "copper",
+                "coral",
+                "cornflower",
+                "cotton",
+                "cream",
+                "crimson",
+                "cyan",
+                "ebony",
+                "emerald",
+                "forest",
+                "fuchsia",
+                "ginger",
+                "gold",
+                "goldenrod",
+                "gray",
+                "green",
+                "grey",
+                "indigo",
+                "ivory",
+                "jade",
+                "jasmine",
+                "khaki",
+                "lava",
+                "lavender",
+                "lemon",
+                "lilac",
+                "lime",
+                "macaroni",
+                "magenta",
+                "magnolia",
+                "mahogany",
+                "malachite",
+                "mango",
+                "maroon",
+                "mauve",
+                "mint",
+                "moonstone",
+                "navy",
+                "ocean",
+                "olive",
+                "onyx",
+                "orange",
+                "orchid",
+                "papaya",
+                "peach",
+                "pear",
+                "pearl",
+                "periwinkle",
+                "pine",
+                "pink",
+                "pistachio",
+                "platinum",
+                "plum",
+                "prune",
+                "pumpkin",
+                "purple",
+                "quartz",
+                "raspberry",
+                "red",
+                "rose",
+                "rosewood",
+                "ruby",
+                "salmon",
+                "sapphire",
+                "scarlet",
+                "sienna",
+                "silver",
+                "slate",
+                "strawberry",
+                "tan",
+                "tangerine",
+                "taupe",
+                "teal",
+                "titanium",
+                "topaz",
+                "turquoise",
+                "umber",
+                "vanilla",
+                "violet",
+                "watermelon",
+                "white",
+                "yellow"
+        };
+
+        final Random random = new Random();
+        long l = random.nextLong();
+
+        if (l < 0) l *= -1;
+
+        final long index = l % colors.length;
+        final String s = colors[(int) index];
+
+        return s;
+    }
 }

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Wed Dec 21 22:48:19 2011
@@ -48,8 +48,9 @@ public class Tracker {
     private final int maxReconnectAttempts;
     private final long exponentialBackoff;
     private final boolean useExponentialBackOff;
+    private final boolean debug;
 
-    public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log) {
+    public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log, boolean debug) {
         this.group = group;
         this.groupPrefix = group + ":";
 
@@ -61,7 +62,7 @@ public class Tracker {
         this.exponentialBackoff = exponentialBackoff;
         this.useExponentialBackOff = exponentialBackoff > 1;
         this.log = log;
-
+        this.debug = debug;
         this.log.info("Created " + this);
     }
 
@@ -74,6 +75,10 @@ public class Tracker {
         return heartRate;
     }
 
+    public int getMaxMissedHeartbeats() {
+        return maxMissedHeartbeats;
+    }
+
     public void setDiscoveryListener(DiscoveryListener discoveryListener) {
         this.discoveryListener = discoveryListener;
     }
@@ -147,7 +152,7 @@ public class Tracker {
         for (ServiceVitals serviceVitals : discoveredServices.values()) {
             if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
 
-                if (log.isDebugEnabled()) {
+                if (debug()) {
                     log.debug("Expired " + serviceVitals.service + String.format(" Timeout{lastSeen=%s, threshold=%s}", serviceVitals.getLastHeartbeat() - now, threshold ));
                 }
 
@@ -159,6 +164,10 @@ public class Tracker {
         }
     }
 
+    private boolean debug() {
+        return debug && log.isDebugEnabled();
+    }
+
     private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
         public Thread newThread(Runnable runable) {
             Thread t = new Thread(runable, "Discovery Agent Notifier");
@@ -168,7 +177,7 @@ public class Tracker {
     });
 
     private void fireServiceRemovedEvent(final URI uri) {
-        if (log.isDebugEnabled()) {
+        if (debug()) {
             log.debug(String.format("Removed Service{uri=%s}", uri));
         }
 
@@ -189,7 +198,7 @@ public class Tracker {
     }
 
     private void fireServiceAddedEvent(final URI uri) {
-        if (log.isDebugEnabled()) {
+        if (debug()) {
             log.debug(String.format("Added Service{uri=%s}", uri));
         }
 
@@ -262,7 +271,7 @@ public class Tracker {
             // Consider that the service recovery has succeeded if it has not
             // failed in 60 seconds.
             if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
-                if (log.isDebugEnabled()) {
+                if (debug()) {
                     log.debug("I now think that the " + service + " service has recovered.");
                 }
                 failureCount = 0;
@@ -289,7 +298,7 @@ public class Tracker {
                     delay = reconnectDelay;
                 }
 
-                if (log.isDebugEnabled()) {
+                if (debug()) {
                     log.debug("Remote failure of " + service + " while still receiving multicast advertisements.  " +
                             "Advertising events will be suppressed for " + delay
                             + " ms, the current failure count is: " + failureCount);
@@ -312,7 +321,7 @@ public class Tracker {
 
             // Are we done trying to recover this guy?
             if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
-                if (log.isDebugEnabled()) {
+                if (debug()) {
                     log.debug("Max reconnect attempts of the " + service + " service has been reached.");
                 }
                 return false;
@@ -323,7 +332,7 @@ public class Tracker {
                 return false;
             }
 
-            if (log.isDebugEnabled()) {
+            if (debug()) {
                 log.debug("Resuming event advertisement of the " + service + " service.");
             }
             dead = false;
@@ -357,6 +366,7 @@ public class Tracker {
         private long exponentialBackoff = 0;
         private int maxReconnectAttempts = 10; // todo: check this out
         private Logger logger;
+        private boolean debug;
         // ---------------------------------
 
 
@@ -424,9 +434,17 @@ public class Tracker {
             this.logger = logger;
         }
 
+        public boolean isDebug() {
+            return debug;
+        }
+
+        public void setDebug(boolean debug) {
+            this.debug = debug;
+        }
+
         public Tracker build() {
             logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), Tracker.class);
-            return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger);
+            return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger, debug);
         }
     }
 

Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Wed Dec 21 22:48:19 2011
@@ -20,8 +20,6 @@ import junit.framework.TestCase;
 import org.apache.openejb.server.DiscoveryListener;
 import org.apache.openejb.server.DiscoveryRegistry;
 import org.apache.openejb.util.Join;
-import org.apache.openejb.util.LogCategory;
-import org.apache.openejb.util.Logger;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -65,13 +63,13 @@ public class MultipointDiscoveryAgentTes
         };
 
         final List<Node> nodes = new ArrayList<Node>();
-        final Node root = new Node("0", listener);
+        final Node root = new Node(0, listener);
 
 
         nodes.add(root);
 
         for (int i = 0; i < PEERS; i++) {
-            final Node node = new Node("0", listener, root.getAgent().getPort());
+            final Node node = new Node(0, listener, root.getAgent().getPort());
             nodes.add(node);
         }
 
@@ -100,14 +98,50 @@ public class MultipointDiscoveryAgentTes
         }
     }
 
+    public void _debug() throws Exception {
+        System.setProperty("log4j.category.OpenEJB.server.discovery", "debug");
+
+        System.setProperty("log4j.appender.C.layout", "org.apache.log4j.PatternLayout");
+        System.setProperty("log4j.appender.C.layout.ConversionPattern", "%d - %m%n");
+
+        final URI greenService = new URI("green://localhost:5555");
+        final Node green = new Node(5555, new Listener("green"), true, "green", 5000);
+
+        green.getRegistry().registerService(greenService);
+
+//        launch(green, "blue", 4444);
+//        launch(green, "red", 6666);
+//        launch(green, "yellow", 8888);
+        final Node orange = launch(green, "orange", 7777);
+
+        Thread.sleep(500000);
+
+        orange.getAgent().stop();
+
+        Thread.sleep(5000);
+
+    }
+
+    private Node launch(Node green, String color, int port) throws Exception {
+        final URI orangeService = new URI(color + "://localhost:"+ port);
+        final Node orange = new Node(port, new Listener(color), green.getPort());
+        orange.getRegistry().registerService(orangeService);
+        Thread.sleep(100);
+        return orange;
+    }
+
     public static class Node {
         private final MultipointDiscoveryAgent agent;
         private final DiscoveryRegistry registry;
 
-        public Node(String p, DiscoveryListener listener, int... peers) throws Exception {
-            this.agent = new MultipointDiscoveryAgent();
+        public Node(int p, DiscoveryListener listener, int... peers) throws Exception {
+            this(p, listener, false, null, 5000, peers);
+        }
+
+        public Node(int p, DiscoveryListener listener, boolean debug, String name, int heartRate, int... peers) throws Exception {
+            this.agent = new MultipointDiscoveryAgent(debug, name);
             final Properties props = new Properties();
-            props.put("port", p);
+            props.put("port", p+"");
 
             List<String> uris = new ArrayList<String>(peers.length);
             for (int port : peers) {
@@ -115,8 +149,8 @@ public class MultipointDiscoveryAgentTes
             }
 
             props.put("initialServers", Join.join(",", uris));
-            props.put("max_missed_heartbeats", "2");
-            props.put("heart_rate", "200");
+            props.put("max_missed_heartbeats", "1");
+            props.put("heart_rate", ""+ heartRate);
             agent.init(props);
 
             this.registry = new DiscoveryRegistry(agent);
@@ -131,5 +165,25 @@ public class MultipointDiscoveryAgentTes
         public DiscoveryRegistry getRegistry() {
             return registry;
         }
+
+        public int getPort() {
+            return agent.getPort();
+        }
+    }
+
+    private static class Listener implements DiscoveryListener {
+        private final String name;
+
+        private Listener(String name) {
+            this.name = name;
+        }
+
+        public void serviceAdded(URI service) {
+//            System.out.printf("[%s] added = %s\n", name, service);
+        }
+
+        public void serviceRemoved(URI service) {
+//            System.out.printf("[%s] removed = %s\n", name, service);
+        }
     }
 }