You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/11/01 03:16:48 UTC
svn commit: r1029532 - in /openejb/trunk/openejb3: ./
container/openejb-core/src/test/java/org/apache/openejb/config/
examples/alternate-descriptors/src/main/resources/META-INF/
server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery...
Author: dblevins
Date: Mon Nov 1 02:16:47 2010
New Revision: 1029532
URL: http://svn.apache.org/viewvc?rev=1029532&view=rev
Log:
svn merge -r 1027695:1027696 https://svn.apache.org/repos/asf/openejb/branches/openejb-3.1.x
http://svn.apache.org/viewvc?view=revision&revision=1027696
------------------------------------------------------------------------
r1027696 | dblevins | 2010-10-26 12:27:19 -0700 (Tue, 26 Oct 2010) | 3 lines
OPENEJB-1386: Multipoint discovery issue leading to ignored heartbeat
OPENEJB-1387: JMX DiscoverRegistry MBean to monitor services broadcast over multicast and multipoint
------------------------------------------------------------------------
Added:
openejb/trunk/openejb3/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
- copied unchanged from r1027696, openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
Modified:
openejb/trunk/openejb3/ (props changed)
openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java (props changed)
openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml (props changed)
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
Propchange: openejb/trunk/openejb3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 1 02:16:47 2010
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1:779593
-/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x:945409,945448,1004381,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
/openejb/branches/openejb-jcdi:984659-985270
Propchange: openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 1 02:16:47 2010
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593
-/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
/openejb/branches/openejb-jcdi/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:984659-985270
Propchange: openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Nov 1 02:16:47 2010
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593
-/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027724,1027739,1027754
+/openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:945409,945448,1005322,1021880,1021903,1021955,1021961,1021965,1021975,1021979,1021990,1022375,1022393,1023096,1023116,1023125,1026527,1027696,1027724,1027739,1027754
/openejb/branches/openejb-jcdi/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:984659-985270
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Mon Nov 1 02:16:47 2010
@@ -28,17 +28,10 @@ import org.apache.openejb.loader.Options
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -126,6 +119,8 @@ public class MultipointDiscoveryAgent im
multipointServer = new MultipointServer(host, port, tracker).start();
+ this.port = multipointServer.getPort();
+
// Connect the initial set of peer servers
StringTokenizer st = new StringTokenizer(initialServers, ",");
while (st.hasMoreTokens()) {
@@ -134,7 +129,7 @@ public class MultipointDiscoveryAgent im
}
} catch (Exception e) {
- throw new ServiceException(e);
+ throw new ServiceException(port+"", e);
}
}
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Mon Nov 1 02:16:47 2010
@@ -27,6 +27,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -50,6 +51,8 @@ import java.util.concurrent.atomic.Atomi
public class MultipointServer {
private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class);
+ private static final URI END_LIST = URI.create("end:list");
+
private final String host;
private final int port;
private final Selector selector;
@@ -67,9 +70,7 @@ public class MultipointServer {
public MultipointServer(String host, int port, Tracker tracker) throws IOException {
if (tracker == null) throw new NullPointerException("tracker cannot be null");
this.host = host;
- this.port = port;
this.tracker = tracker;
- me = URI.create("conn://" + host + ":" + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -78,6 +79,9 @@ public class MultipointServer {
serverSocket.bind(address);
serverChannel.configureBlocking(false);
+ this.port = serverSocket.getLocalPort();
+ me = URI.create("conn://" + this.host + ":" + this.port);
+
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -85,6 +89,10 @@ public class MultipointServer {
println("Listening");
}
+ public int getPort() {
+ return port;
+ }
+
public MultipointServer start() {
if (running.compareAndSet(false, true)) {
Thread thread = new Thread(new Runnable() {
@@ -130,6 +138,7 @@ public class MultipointServer {
}
public void state(int ops, State state) {
+// trace("transition "+state +" "+ops);
this.state = state;
if (ops > 0) key.interestOps(ops);
}
@@ -146,6 +155,14 @@ public class MultipointServer {
}
}
+ private void info(String str) {
+// println(message(str));
+
+ if (log.isInfoEnabled()) {
+ log.info(message(str));
+ }
+ }
+
private String message(String str) {
StringBuilder sb = new StringBuilder();
sb.append(port);
@@ -242,7 +259,12 @@ public class MultipointServer {
}
private void heartbeat() throws IOException {
- write(tracker.getRegisteredServices());
+
+ final Set<String> strings = tracker.getRegisteredServices();
+ for (String string : strings) {
+ trace(string);
+ }
+ write(strings);
state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT);
tracker.checkServices();
@@ -280,7 +302,7 @@ public class MultipointServer {
// address of the client before sending data.
// once they send us their address, we will send our
- // full list of known addresses, followed by our own
+ // full list of known addresses, followed by the "end"
// address to signal that we are done.
// Afterward we will only pulls our heartbeat
@@ -308,14 +330,18 @@ public class MultipointServer {
// before accepting data
// once a server reads our address, it will send it's
- // full list of known addresses, followed by it's own
+ // full list of known addresses, followed by the "end"
// address to signal that it is done.
- // we will initiate connections to everyone in the list
- // who we have not yet seen.
+ // we will then send our full list of known addresses,
+ // followed by the "end" address to signal we are done.
// Afterward the server will only pulls its heartbeat
+ // separately, we will initiate connections to everyone
+ // in the list who we have not yet seen.
+
+ // WRITE our GREETING
session.write(me);
session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING);
@@ -329,6 +355,20 @@ public class MultipointServer {
switch (session.state) {
case GREETING: { // read
+ // This state is only reachable as a SERVER
+ // The client connected and said hello by sending
+ // its URI to let us know who they are
+
+ // Once this is read, the client will expect us
+ // to send our full list of URIs followed by the
+ // "end" address.
+
+ // So we switch to WRITE LISTING and they switch
+ // to READ LISTING
+
+ // Then we will switch to READ LISTING and they
+ // will switch to WRITE LISTING
+
String message = session.read();
if (message == null) break; // need to read more
@@ -345,8 +385,7 @@ public class MultipointServer {
// they'll know it's time to list their URIs
list.remove(me); // yank
- list.remove(session.uri); // yank
- list.add(session.uri); // add to the end
+ list.add(END_LIST); // add to the end
session.write(list);
@@ -364,42 +403,58 @@ public class MultipointServer {
while ((message = session.read()) != null) {
+ session.trace(message);
+
URI uri = URI.create(message);
- session.listed.add(uri);
+ if (END_LIST.equals(uri)) {
- session.trace(message);
+ if (session.client) {
- // they listed me, means they want my list
- if (uri.equals(me)) {
- ArrayList<URI> list = connections();
+ ArrayList<URI> list = connections();
- for (URI reported : session.listed) {
- list.remove(reported);
- }
+ for (URI reported : session.listed) {
+ list.remove(reported);
+ }
- // When they read us on the list
- // they'll know it's time to switch to heartbeat
+ // When they read us on the list
+ // they'll know it's time to switch to heartbeat
- list.remove(session.uri);
- list.remove(me); // yank if in the middle
- list.add(me); // add to the end
+ list.remove(session.uri);
+ list.add(END_LIST);
+
+ session.write(list);
- session.write(list);
+ session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
- session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+ } else {
- } else if (uri.equals(session.uri)) {
+ // We are a SERVER in this relationship, so we will have already
+ // listed our known peers by this point. From here we switch to
+ // heartbeating
+
+ // heartbeat time
+ if (session.hangup) {
+ session.state(0, State.CLOSED);
+ session.trace("hangup");
+ hangup(key);
+
+ } else {
+
+ session.trace("DONE READING");
+
+ session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+
+ }
- if (session.hangup) {
- session.state(0, State.CLOSED);
- session.trace("hangup");
- hangup(key);
- } else {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
}
+ break;
+
} else {
+
+ session.listed.add(uri);
+
try {
connect(uri);
} catch (Exception e) {
@@ -431,6 +486,10 @@ public class MultipointServer {
switch (session.state) {
case GREETING: { // write
+ // Only CLIENTs write a GREETING message
+ // As we are a client, the first thing we do
+ // is READ the server's LIST
+
if (session.drain()) {
session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
}
@@ -442,16 +501,20 @@ public class MultipointServer {
if (session.drain()) {
- // we haven't ready any URIs yet
- if (session.listed.size() == 0) {
-
- session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
+ if (session.client) {
+ // CLIENTs list last, so at this point we've read
+ // the server's list and have written ours
+
+ session.trace("DONE WRITING");
+ session.state(SelectionKey.OP_READ, State.HEARTBEAT);
+
} else {
+ // SERVERs always write their list first, so at this
+ // point we switch to LIST READ mode
- session.trace("DONE");
+ session.state(SelectionKey.OP_READ, State.LISTING);
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
}
}
}
@@ -474,6 +537,13 @@ public class MultipointServer {
}
}
+ } catch (CancelledKeyException ex) {
+ synchronized (connect) {
+ Session session = (Session) key.attachment();
+ if (session.state != State.CLOSED) {
+ close(key);
+ }
+ }
} catch (ClosedChannelException ex) {
synchronized (connect) {
Session session = (Session) key.attachment();
@@ -493,7 +563,7 @@ public class MultipointServer {
Session session = (Session) key.attachment();
try {
- if (session != null) session.tick();
+ if (session != null && session.state == State.HEARTBEAT) session.tick();
} catch (IOException ex) {
close(key);
}
@@ -570,7 +640,6 @@ public class MultipointServer {
key.channel().close();
} catch (IOException cex) {
}
-
}
@@ -644,8 +713,8 @@ public class MultipointServer {
session = sessions[0];
duplicate = sessions[1];
- session.trace(session + "@" + session.hashCode() + " KEEP");
- duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL");
+ session.info(session + "@" + session.hashCode() + " KEEP");
+ duplicate.info(duplicate + "@" + duplicate.hashCode() + " KILL");
duplicate.hangup = true;
}
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Mon Nov 1 02:16:47 2010
@@ -17,6 +17,7 @@
package org.apache.openejb.server;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.Managed;
import java.net.URI;
import java.util.List;
@@ -44,6 +45,9 @@ public class DiscoveryRegistry implement
private final Map<String, URI> services = new ConcurrentHashMap<String, URI>();
private final Map<String, URI> registered = new ConcurrentHashMap<String, URI>();
+ @Managed
+ private final Monitor monitor = new Monitor();
+
private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName());
@@ -125,7 +129,7 @@ public class DiscoveryRegistry implement
}
public void serviceRemoved(URI service) {
-
+ services.remove(service.toString());
for (final DiscoveryListener discoveryListener : getListeners()) {
executor.execute(new ServiceRemovedTask(discoveryListener, service));
}
@@ -168,4 +172,30 @@ public class DiscoveryRegistry implement
}
}
}
+
+ @Managed
+ private class Monitor {
+
+ @Managed
+ public String[] getDiscovered() {
+ final Set<String> set = DiscoveryRegistry.this.services.keySet();
+ return set.toArray(new String[set.size()]);
+ }
+
+ @Managed
+ public String[] getRegistered() {
+ final Set<String> set = DiscoveryRegistry.this.registered.keySet();
+ return set.toArray(new String[set.size()]);
+ }
+
+ @Managed
+ public String[] getAgents() {
+ List<String> list = new ArrayList<String>();
+ for (DiscoveryAgent agent : DiscoveryRegistry.this.agents) {
+ list.add(agent.getClass().getName());
+ }
+ return list.toArray(new String[list.size()]);
+ }
+
+ }
}
Modified: openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1029532&r1=1029531&r2=1029532&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original)
+++ openejb/trunk/openejb3/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Mon Nov 1 02:16:47 2010
@@ -17,6 +17,7 @@
package org.apache.openejb.server;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.List;
@@ -25,8 +26,13 @@ import java.util.Properties;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.ManagedMBean;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.xbean.finder.ResourceFinder;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
/**
* @version $Rev$ $Date$
* @org.apache.xbean.XBean element="simpleServiceManager"
@@ -107,6 +113,22 @@ public class SimpleServiceManager extend
}
DiscoveryRegistry registry = new DiscoveryRegistry();
+
+ // register the mbean
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb");
+ jmxName.set("type", "Server");
+ jmxName.set("name", "DiscoveryRegistry");
+
+ ObjectName objectName = jmxName.build();
+ server.registerMBean(new ManagedMBean(registry), objectName);
+ } catch (Exception e) {
+ logger.error("Unable to register MBean ", e);
+ }
+
+
SystemInstance.get().setComponent(DiscoveryRegistry.class, registry);
ServiceFinder serviceFinder = new ServiceFinder("META-INF/");