You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/11/01 03:16:48 UTC

svn commit: r1029532 - in /openejb/trunk/openejb3: ./ container/openejb-core/src/test/java/org/apache/openejb/config/ examples/alternate-descriptors/src/main/resources/META-INF/ server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery...

Author: dblevins
Date: Mon Nov  1 02:16:47 2010
New Revision: 1029532

URL: http://svn.apache.org/viewvc?rev=1029532&view=rev
Log:
svn merge -r 1027695:1027696 https://svn.apache.org/repos/asf/openejb/branches/openejb-3.1.x

http://svn.apache.org/viewvc?view=revision&revision=1027696
------------------------------------------------------------------------
r1027696 | dblevins | 2010-10-26 12:27:19 -0700 (Tue, 26 Oct 2010) | 3 lines

OPENEJB-1386: Multipoint discovery issue leading to ignored heartbeat
OPENEJB-1387: JMX DiscoverRegistry MBean to monitor services broadcast over multicast and multipoint

------------------------------------------------------------------------

Added:
    openejb/trunk/openejb3/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
      - copied unchanged from r1027696, openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
Modified:
    openejb/trunk/openejb3/   (props changed)
    openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java   (props changed)
    openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml   (props changed)
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
    openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
    openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java

Propchange: openejb/trunk/openejb3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  1 02:16:47 2010
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1:779593
-/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
 /openejb/branches/openejb-jcdi:984659-985270

Propchange: openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  1 02:16:47 2010
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593
-/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
 /openejb/branches/openejb-jcdi/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:984659-985270

Propchange: openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov  1 02:16:47 2010
@@ -1,3 +1,3 @@
 /openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593
-/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
 /openejb/branches/openejb-jcdi/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:984659-985270

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Mon Nov  1 02:16:47 2010
@@ -28,17 +28,10 @@ import org.apache.openejb.loader.Options
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
 import java.net.Socket;
-import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Properties;
 import java.util.StringTokenizer;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -126,6 +119,8 @@ public class MultipointDiscoveryAgent im
 
                 multipointServer = new MultipointServer(host, port, tracker).start();
 
+                this.port = multipointServer.getPort();
+                
                 // Connect the initial set of peer servers
                 StringTokenizer st = new StringTokenizer(initialServers, ",");
                 while (st.hasMoreTokens()) {
@@ -134,7 +129,7 @@ public class MultipointDiscoveryAgent im
 
             }
         } catch (Exception e) {
-            throw new ServiceException(e);
+            throw new ServiceException(port+"", e);
         }
     }
 

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Mon Nov  1 02:16:47 2010
@@ -27,6 +27,7 @@ import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
@@ -50,6 +51,8 @@ import java.util.concurrent.atomic.Atomi
 public class MultipointServer {
     private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class);
 
+    private static final URI END_LIST = URI.create("end:list");
+    
     private final String host;
     private final int port;
     private final Selector selector;
@@ -67,9 +70,7 @@ public class MultipointServer {
     public MultipointServer(String host, int port, Tracker tracker) throws IOException {
         if (tracker == null) throw new NullPointerException("tracker cannot be null");
         this.host = host;
-        this.port = port;
         this.tracker = tracker;
-        me = URI.create("conn://" + host + ":" + port);
 
         ServerSocketChannel serverChannel = ServerSocketChannel.open();
 
@@ -78,6 +79,9 @@ public class MultipointServer {
         serverSocket.bind(address);
         serverChannel.configureBlocking(false);
 
+        this.port = serverSocket.getLocalPort();
+        me = URI.create("conn://" + this.host + ":" + this.port);
+
         selector = Selector.open();
 
         serverChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -85,6 +89,10 @@ public class MultipointServer {
         println("Listening");
     }
 
+    public int getPort() {
+        return port;
+    }
+
     public MultipointServer start() {
         if (running.compareAndSet(false, true)) {
             Thread thread = new Thread(new Runnable() {
@@ -130,6 +138,7 @@ public class MultipointServer {
         }
 
         public void state(int ops, State state) {
+//            trace("transition "+state +"  "+ops);
             this.state = state;
             if (ops > 0) key.interestOps(ops);
         }
@@ -146,6 +155,14 @@ public class MultipointServer {
             }
         }
 
+        private void info(String str) {
+//            println(message(str));
+
+            if (log.isInfoEnabled()) {
+                log.info(message(str));
+            }
+        }
+
         private String message(String str) {
             StringBuilder sb = new StringBuilder();
             sb.append(port);
@@ -242,7 +259,12 @@ public class MultipointServer {
         }
 
         private void heartbeat() throws IOException {
-            write(tracker.getRegisteredServices());
+
+            final Set<String> strings = tracker.getRegisteredServices();
+            for (String string : strings) {
+                trace(string);
+            }
+            write(strings);
             state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT);
 
             tracker.checkServices();
@@ -280,7 +302,7 @@ public class MultipointServer {
                         // address of the client before sending data.
 
                         // once they send us their address, we will send our
-                        // full list of known addresses, followed by our own
+                        // full list of known addresses, followed by the "end"
                         // address to signal that we are done.
 
                         // Afterward we will only pulls our heartbeat
@@ -308,14 +330,18 @@ public class MultipointServer {
                         // before accepting data
 
                         // once a server reads our address, it will send it's
-                        // full list of known addresses, followed by it's own
+                        // full list of known addresses, followed by the "end"
                         // address to signal that it is done.
 
-                        // we will initiate connections to everyone in the list
-                        // who we have not yet seen.
+                        // we will then send our full list of known addresses,
+                        // followed by the "end" address to signal we are done.
 
                         // Afterward the server will only pulls its heartbeat
 
+                        // separately, we will initiate connections to everyone
+                        // in the list who we have not yet seen.
+
+                        // WRITE our GREETING
                         session.write(me);
 
                         session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING);
@@ -329,6 +355,20 @@ public class MultipointServer {
                         switch (session.state) {
                             case GREETING: { // read
 
+                                // This state is only reachable as a SERVER
+                                // The client connected and said hello by sending
+                                // its URI to let us know who they are
+
+                                // Once this is read, the client will expect us
+                                // to send our full list of URIs followed by the
+                                // "end" address.
+
+                                // So we switch to WRITE LISTING and they switch
+                                // to READ LISTING
+
+                                // Then we will switch to READ LISTING and they
+                                // will switch to WRITE LISTING
+                                
                                 String message = session.read();
 
                                 if (message == null) break; // need to read more
@@ -345,8 +385,7 @@ public class MultipointServer {
                                 // they'll know it's time to list their URIs
 
                                 list.remove(me); // yank
-                                list.remove(session.uri); // yank
-                                list.add(session.uri); // add to the end
+                                list.add(END_LIST); // add to the end
 
                                 session.write(list);
 
@@ -364,42 +403,58 @@ public class MultipointServer {
 
                                 while ((message = session.read()) != null) {
 
+                                    session.trace(message);
+
                                     URI uri = URI.create(message);
 
-                                    session.listed.add(uri);
+                                    if (END_LIST.equals(uri)) {
 
-                                    session.trace(message);
+                                        if (session.client) {
 
-                                    // they listed me, means they want my list
-                                    if (uri.equals(me)) {
-                                        ArrayList<URI> list = connections();
+                                            ArrayList<URI> list = connections();
 
-                                        for (URI reported : session.listed) {
-                                            list.remove(reported);
-                                        }
+                                            for (URI reported : session.listed) {
+                                                list.remove(reported);
+                                            }
 
-                                        // When they read us on the list
-                                        // they'll know it's time to switch to heartbeat
+                                            // When they read us on the list
+                                            // they'll know it's time to switch to heartbeat
 
-                                        list.remove(session.uri);
-                                        list.remove(me); // yank if in the middle
-                                        list.add(me); // add to the end
+                                            list.remove(session.uri);
+                                            list.add(END_LIST);
+                                            
+                                            session.write(list);
 
-                                        session.write(list);
+                                            session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
 
-                                        session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+                                        } else {
 
-                                    } else if (uri.equals(session.uri)) {
+                                            // We are a SERVER in this relationship, so we will have already
+                                            // listed our known peers by this point.  From here we switch to
+                                            // heartbeating
+                                            
+                                            // heartbeat time
+                                            if (session.hangup) {
+                                                session.state(0, State.CLOSED);
+                                                session.trace("hangup");
+                                                hangup(key);
+
+                                            } else {
+
+                                                session.trace("DONE READING");
+
+                                                session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+
+                                            }
 
-                                        if (session.hangup) {
-                                            session.state(0, State.CLOSED);
-                                            session.trace("hangup");
-                                            hangup(key);
-                                        } else {
-                                            session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
                                         }
 
+                                        break;
+
                                     } else {
+
+                                        session.listed.add(uri);
+
                                         try {
                                             connect(uri);
                                         } catch (Exception e) {
@@ -431,6 +486,10 @@ public class MultipointServer {
                         switch (session.state) {
                             case GREETING: { // write
 
+                                // Only CLIENTs write a GREETING message
+                                // As we are a client, the first thing we do
+                                // is READ the server's LIST
+                                
                                 if (session.drain()) {
                                     session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
                                 }
@@ -442,16 +501,20 @@ public class MultipointServer {
 
                                 if (session.drain()) {
 
-                                    // we haven't ready any URIs yet
-                                    if (session.listed.size() == 0) {
-
-                                        session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
+                                    if (session.client) {
+                                        // CLIENTs list last, so at this point we've read
+                                        // the server's list and have written ours
+                                       
+                                        session.trace("DONE WRITING");
 
+                                        session.state(SelectionKey.OP_READ, State.HEARTBEAT);
+                                        
                                     } else {
+                                        // SERVERs always write their list first, so at this
+                                        // point we switch to LIST READ mode
 
-                                        session.trace("DONE");
+                                        session.state(SelectionKey.OP_READ, State.LISTING);
 
-                                        session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
                                     }
                                 }
                             }
@@ -474,6 +537,13 @@ public class MultipointServer {
                         }
                     }
 
+                } catch (CancelledKeyException ex) {
+                    synchronized (connect) {
+                        Session session = (Session) key.attachment();
+                        if (session.state != State.CLOSED) {
+                            close(key);
+                        }
+                    }
                 } catch (ClosedChannelException ex) {
                     synchronized (connect) {
                         Session session = (Session) key.attachment();
@@ -493,7 +563,7 @@ public class MultipointServer {
                 Session session = (Session) key.attachment();
 
                 try {
-                    if (session != null) session.tick();
+                    if (session != null && session.state == State.HEARTBEAT) session.tick();
                 } catch (IOException ex) {
                     close(key);
                 }
@@ -570,7 +640,6 @@ public class MultipointServer {
             key.channel().close();
         } catch (IOException cex) {
         }
-
     }
 
 
@@ -644,8 +713,8 @@ public class MultipointServer {
                 session = sessions[0];
                 duplicate = sessions[1];
 
-                session.trace(session + "@" + session.hashCode() + " KEEP");
-                duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL");
+                session.info(session + "@" + session.hashCode() + " KEEP");
+                duplicate.info(duplicate + "@" + duplicate.hashCode() + " KILL");
 
                 duplicate.hangup = true;
             }

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Nov  1 02:16:47 2010
@@ -17,6 +17,7 @@
 package org.apache.openejb.server;
 
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.Managed;
 
 import java.net.URI;
 import java.util.List;
@@ -44,6 +45,9 @@ public class DiscoveryRegistry implement
     private final Map<String, URI> services = new ConcurrentHashMap<String, URI>();
     private final Map<String, URI> registered = new ConcurrentHashMap<String, URI>();
 
+    @Managed
+    private final Monitor monitor = new Monitor();
+
     private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
         public Thread newThread(Runnable runable) {
             Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName());
@@ -125,7 +129,7 @@ public class DiscoveryRegistry implement
     }
 
     public void serviceRemoved(URI service) {
-
+        services.remove(service.toString());
         for (final DiscoveryListener discoveryListener : getListeners()) {
             executor.execute(new ServiceRemovedTask(discoveryListener, service));
         }
@@ -168,4 +172,30 @@ public class DiscoveryRegistry implement
             }
         }
     }
+
+    @Managed
+    private class Monitor {
+
+        @Managed
+        public String[] getDiscovered() {
+            final Set<String> set = DiscoveryRegistry.this.services.keySet();
+            return set.toArray(new String[set.size()]);
+        }
+
+        @Managed
+        public String[] getRegistered() {
+            final Set<String> set = DiscoveryRegistry.this.registered.keySet();
+            return set.toArray(new String[set.size()]);
+        }
+
+        @Managed
+        public String[] getAgents() {
+            List<String> list = new ArrayList<String>();
+            for (DiscoveryAgent agent : DiscoveryRegistry.this.agents) {
+                list.add(agent.getClass().getName());
+            }
+            return list.toArray(new String[list.size()]);
+        }
+
+    }
 }

Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Mon Nov  1 02:16:47 2010
@@ -17,6 +17,7 @@
 package org.apache.openejb.server;
 
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
 import java.net.InetAddress;
 import java.util.Iterator;
 import java.util.List;
@@ -25,8 +26,13 @@ import java.util.Properties;
 
 
 import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.ManagedMBean;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
 import org.apache.xbean.finder.ResourceFinder;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
 /**
  * @version $Rev$ $Date$
  * @org.apache.xbean.XBean element="simpleServiceManager"
@@ -107,6 +113,22 @@ public class SimpleServiceManager extend
         }
 
         DiscoveryRegistry registry = new DiscoveryRegistry();
+
+        // register the mbean
+        try {
+            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+            ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb");
+            jmxName.set("type", "Server");
+            jmxName.set("name", "DiscoveryRegistry");
+
+            ObjectName objectName = jmxName.build();
+            server.registerMBean(new ManagedMBean(registry), objectName);
+        } catch (Exception e) {
+            logger.error("Unable to register MBean ", e);
+        }
+
+
         SystemInstance.get().setComponent(DiscoveryRegistry.class, registry);
 
         ServiceFinder serviceFinder = new ServiceFinder("META-INF/");