You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/10/26 21:27:19 UTC
svn commit: r1027696 - in /openejb/branches/openejb-3.1.x/server:
openejb-multicast/src/main/java/org/apache/openejb/server/discovery/
openejb-multicast/src/test/java/org/apache/openejb/server/discovery/
openejb-server/src/main/java/org/apache/openejb/...
Author: dblevins
Date: Tue Oct 26 19:27:19 2010
New Revision: 1027696
URL: http://svn.apache.org/viewvc?rev=1027696&view=rev
Log:
OPENEJB-1386: Multipoint discovery issue leading to ignored heartbeat
OPENEJB-1387: JMX DiscoverRegistry MBean to monitor services broadcast over multicast and multipoint
Added:
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java (with props)
Modified:
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1027696&r1=1027695&r2=1027696&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Tue Oct 26 19:27:19 2010
@@ -28,17 +28,10 @@ import org.apache.openejb.loader.Options
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.MulticastSocket;
import java.net.Socket;
-import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Properties;
import java.util.StringTokenizer;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -126,6 +119,8 @@ public class MultipointDiscoveryAgent im
multipointServer = new MultipointServer(host, port, tracker).start();
+ this.port = multipointServer.getPort();
+
// Connect the initial set of peer servers
StringTokenizer st = new StringTokenizer(initialServers, ",");
while (st.hasMoreTokens()) {
@@ -134,7 +129,7 @@ public class MultipointDiscoveryAgent im
}
} catch (Exception e) {
- throw new ServiceException(e);
+ throw new ServiceException(port+"", e);
}
}
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1027696&r1=1027695&r2=1027696&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Tue Oct 26 19:27:19 2010
@@ -27,6 +27,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@@ -50,6 +51,8 @@ import java.util.concurrent.atomic.Atomi
public class MultipointServer {
private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class);
+ private static final URI END_LIST = URI.create("end:list");
+
private final String host;
private final int port;
private final Selector selector;
@@ -67,9 +70,7 @@ public class MultipointServer {
public MultipointServer(String host, int port, Tracker tracker) throws IOException {
if (tracker == null) throw new NullPointerException("tracker cannot be null");
this.host = host;
- this.port = port;
this.tracker = tracker;
- me = URI.create("conn://" + host + ":" + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
@@ -78,6 +79,9 @@ public class MultipointServer {
serverSocket.bind(address);
serverChannel.configureBlocking(false);
+ this.port = serverSocket.getLocalPort();
+ me = URI.create("conn://" + this.host + ":" + this.port);
+
selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
@@ -85,6 +89,10 @@ public class MultipointServer {
println("Listening");
}
+ public int getPort() {
+ return port;
+ }
+
public MultipointServer start() {
if (running.compareAndSet(false, true)) {
Thread thread = new Thread(new Runnable() {
@@ -130,6 +138,7 @@ public class MultipointServer {
}
public void state(int ops, State state) {
+// trace("transition "+state +" "+ops);
this.state = state;
if (ops > 0) key.interestOps(ops);
}
@@ -146,6 +155,14 @@ public class MultipointServer {
}
}
+ private void info(String str) {
+// println(message(str));
+
+ if (log.isInfoEnabled()) {
+ log.info(message(str));
+ }
+ }
+
private String message(String str) {
StringBuilder sb = new StringBuilder();
sb.append(port);
@@ -242,7 +259,12 @@ public class MultipointServer {
}
private void heartbeat() throws IOException {
- write(tracker.getRegisteredServices());
+
+ final Set<String> strings = tracker.getRegisteredServices();
+ for (String string : strings) {
+ trace(string);
+ }
+ write(strings);
state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT);
tracker.checkServices();
@@ -280,7 +302,7 @@ public class MultipointServer {
// address of the client before sending data.
// once they send us their address, we will send our
- // full list of known addresses, followed by our own
+ // full list of known addresses, followed by the "end"
// address to signal that we are done.
// Afterward we will only pulls our heartbeat
@@ -308,14 +330,18 @@ public class MultipointServer {
// before accepting data
// once a server reads our address, it will send it's
- // full list of known addresses, followed by it's own
+ // full list of known addresses, followed by the "end"
// address to signal that it is done.
- // we will initiate connections to everyone in the list
- // who we have not yet seen.
+ // we will then send our full list of known addresses,
+ // followed by the "end" address to signal we are done.
// Afterward the server will only pulls its heartbeat
+ // separately, we will initiate connections to everyone
+ // in the list who we have not yet seen.
+
+ // WRITE our GREETING
session.write(me);
session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING);
@@ -329,6 +355,20 @@ public class MultipointServer {
switch (session.state) {
case GREETING: { // read
+ // This state is only reachable as a SERVER
+ // The client connected and said hello by sending
+ // its URI to let us know who they are
+
+ // Once this is read, the client will expect us
+ // to send our full list of URIs followed by the
+ // "end" address.
+
+ // So we switch to WRITE LISTING and they switch
+ // to READ LISTING
+
+ // Then we will switch to READ LISTING and they
+ // will switch to WRITE LISTING
+
String message = session.read();
if (message == null) break; // need to read more
@@ -345,8 +385,7 @@ public class MultipointServer {
// they'll know it's time to list their URIs
list.remove(me); // yank
- list.remove(session.uri); // yank
- list.add(session.uri); // add to the end
+ list.add(END_LIST); // add to the end
session.write(list);
@@ -364,42 +403,58 @@ public class MultipointServer {
while ((message = session.read()) != null) {
+ session.trace(message);
+
URI uri = URI.create(message);
- session.listed.add(uri);
+ if (END_LIST.equals(uri)) {
- session.trace(message);
+ if (session.client) {
- // they listed me, means they want my list
- if (uri.equals(me)) {
- ArrayList<URI> list = connections();
+ ArrayList<URI> list = connections();
- for (URI reported : session.listed) {
- list.remove(reported);
- }
+ for (URI reported : session.listed) {
+ list.remove(reported);
+ }
- // When they read us on the list
- // they'll know it's time to switch to heartbeat
+ // When they read us on the list
+ // they'll know it's time to switch to heartbeat
- list.remove(session.uri);
- list.remove(me); // yank if in the middle
- list.add(me); // add to the end
+ list.remove(session.uri);
+ list.add(END_LIST);
+
+ session.write(list);
- session.write(list);
+ session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
- session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+ } else {
- } else if (uri.equals(session.uri)) {
+ // We are a SERVER in this relationship, so we will have already
+ // listed our known peers by this point. From here we switch to
+ // heartbeating
+
+ // heartbeat time
+ if (session.hangup) {
+ session.state(0, State.CLOSED);
+ session.trace("hangup");
+ hangup(key);
+
+ } else {
+
+ session.trace("DONE READING");
+
+ session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+
+ }
- if (session.hangup) {
- session.state(0, State.CLOSED);
- session.trace("hangup");
- hangup(key);
- } else {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
}
+ break;
+
} else {
+
+ session.listed.add(uri);
+
try {
connect(uri);
} catch (Exception e) {
@@ -431,6 +486,10 @@ public class MultipointServer {
switch (session.state) {
case GREETING: { // write
+ // Only CLIENTs write a GREETING message
+ // As we are a client, the first thing we do
+ // is READ the server's LIST
+
if (session.drain()) {
session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
}
@@ -442,16 +501,20 @@ public class MultipointServer {
if (session.drain()) {
- // we haven't ready any URIs yet
- if (session.listed.size() == 0) {
-
- session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
+ if (session.client) {
+ // CLIENTs list last, so at this point we've read
+ // the server's list and have written ours
+
+ session.trace("DONE WRITING");
+ session.state(SelectionKey.OP_READ, State.HEARTBEAT);
+
} else {
+ // SERVERs always write their list first, so at this
+ // point we switch to LIST READ mode
- session.trace("DONE");
+ session.state(SelectionKey.OP_READ, State.LISTING);
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
}
}
}
@@ -474,6 +537,13 @@ public class MultipointServer {
}
}
+ } catch (CancelledKeyException ex) {
+ synchronized (connect) {
+ Session session = (Session) key.attachment();
+ if (session.state != State.CLOSED) {
+ close(key);
+ }
+ }
} catch (ClosedChannelException ex) {
synchronized (connect) {
Session session = (Session) key.attachment();
@@ -493,7 +563,7 @@ public class MultipointServer {
Session session = (Session) key.attachment();
try {
- if (session != null) session.tick();
+ if (session != null && session.state == State.HEARTBEAT) session.tick();
} catch (IOException ex) {
close(key);
}
@@ -570,7 +640,6 @@ public class MultipointServer {
key.channel().close();
} catch (IOException cex) {
}
-
}
@@ -644,8 +713,8 @@ public class MultipointServer {
session = sessions[0];
duplicate = sessions[1];
- session.trace(session + "@" + session.hashCode() + " KEEP");
- duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL");
+ session.info(session + "@" + session.hashCode() + " KEEP");
+ duplicate.info(duplicate + "@" + duplicate.hashCode() + " KILL");
duplicate.hangup = true;
}
Added: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java?rev=1027696&view=auto
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java (added)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Tue Oct 26 19:27:19 2010
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import junit.framework.TestCase;
+import org.apache.openejb.server.DiscoveryListener;
+import org.apache.openejb.server.DiscoveryRegistry;
+import org.apache.openejb.util.Join;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class MultipointDiscoveryAgentTest extends TestCase {
+
+ public void test() throws Exception {
+
+ final URI testService = new URI("green://localhost:0");
+
+ final int PEERS = 4;
+
+ final CountDownLatch[] latches = {
+ new CountDownLatch(PEERS + 1),
+ new CountDownLatch(PEERS + 1)
+ };
+
+ final DiscoveryListener listener = new DiscoveryListener() {
+ public void serviceAdded(URI service) {
+ System.out.println("added = " + service);
+ if (testService.equals(service)) {
+ latches[0].countDown();
+ }
+ }
+
+ public void serviceRemoved(URI service) {
+ System.out.println("removed = " + service);
+ if (testService.equals(service)) {
+ latches[1].countDown();
+ }
+ }
+ };
+
+ final List<Node> nodes = new ArrayList<Node>();
+ final Node root = new Node("0", listener);
+
+
+ nodes.add(root);
+
+ for (int i = 0; i < PEERS; i++) {
+ final Node node = new Node("0", listener, root.getAgent().getPort());
+ nodes.add(node);
+ }
+
+ final Node owner = nodes.get(nodes.size() / 2);
+
+
+ for (int i = 0; i < 3; i++) {
+ latches[0] = new CountDownLatch(PEERS + 1);
+ latches[1] = new CountDownLatch(PEERS + 1);
+
+ // OK, do the broadcast
+ owner.getRegistry().registerService(testService);
+
+ // Notification should have reached all participants
+ assertTrue("round=" + i + ". Add failed", latches[0].await(30, TimeUnit.SECONDS));
+
+ owner.getRegistry().unregisterService(testService);
+
+ assertTrue("round=" + i + ". Remove failed", latches[1].await(60, TimeUnit.SECONDS));
+
+ for (Node node : nodes) {
+ final Set<URI> services = node.getRegistry().getServices();
+ assertEquals("round=" + i + ". Service retained", 0, services.size());
+ }
+
+ }
+ }
+
+ public static class Node {
+ private final MultipointDiscoveryAgent agent;
+ private final DiscoveryRegistry registry;
+
+ public Node(String p, DiscoveryListener listener, int... peers) throws Exception {
+ this.agent = new MultipointDiscoveryAgent();
+ final Properties props = new Properties();
+ props.put("port", p);
+
+ List<String> uris = new ArrayList<String>(peers.length);
+ for (int port : peers) {
+ uris.add("localhost:"+port);
+ }
+
+ props.put("initialServers", Join.join(",", uris));
+ agent.init(props);
+
+ this.registry = new DiscoveryRegistry(agent);
+ this.registry.addDiscoveryListener(listener);
+ agent.start();
+ }
+
+ public MultipointDiscoveryAgent getAgent() {
+ return agent;
+ }
+
+ public DiscoveryRegistry getRegistry() {
+ return registry;
+ }
+ }
+}
Propchange: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java?rev=1027696&r1=1027695&r2=1027696&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/DiscoveryRegistry.java Tue Oct 26 19:27:19 2010
@@ -17,6 +17,7 @@
package org.apache.openejb.server;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.Managed;
import java.net.URI;
import java.util.List;
@@ -44,6 +45,9 @@ public class DiscoveryRegistry implement
private final Map<String, URI> services = new ConcurrentHashMap<String, URI>();
private final Map<String, URI> registered = new ConcurrentHashMap<String, URI>();
+ @Managed
+ private final Monitor monitor = new Monitor();
+
private final Executor executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, DiscoveryRegistry.class.getSimpleName());
@@ -125,7 +129,7 @@ public class DiscoveryRegistry implement
}
public void serviceRemoved(URI service) {
-
+ services.remove(service.toString());
for (final DiscoveryListener discoveryListener : getListeners()) {
executor.execute(new ServiceRemovedTask(discoveryListener, service));
}
@@ -168,4 +172,30 @@ public class DiscoveryRegistry implement
}
}
}
+
+ @Managed
+ private class Monitor {
+
+ @Managed
+ public String[] getDiscovered() {
+ final Set<String> set = DiscoveryRegistry.this.services.keySet();
+ return set.toArray(new String[set.size()]);
+ }
+
+ @Managed
+ public String[] getRegistered() {
+ final Set<String> set = DiscoveryRegistry.this.registered.keySet();
+ return set.toArray(new String[set.size()]);
+ }
+
+ @Managed
+ public String[] getAgents() {
+ List<String> list = new ArrayList<String>();
+ for (DiscoveryAgent agent : DiscoveryRegistry.this.agents) {
+ list.add(agent.getClass().getName());
+ }
+ return list.toArray(new String[list.size()]);
+ }
+
+ }
}
Modified: openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java?rev=1027696&r1=1027695&r2=1027696&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-server/src/main/java/org/apache/openejb/server/SimpleServiceManager.java Tue Oct 26 19:27:19 2010
@@ -17,6 +17,7 @@
package org.apache.openejb.server;
import java.io.IOException;
+import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.List;
@@ -25,8 +26,13 @@ import java.util.Properties;
import org.apache.openejb.loader.SystemInstance;
+import org.apache.openejb.monitoring.ManagedMBean;
+import org.apache.openejb.monitoring.ObjectNameBuilder;
import org.apache.xbean.finder.ResourceFinder;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+
/**
* @version $Rev$ $Date$
* @org.apache.xbean.XBean element="simpleServiceManager"
@@ -107,6 +113,22 @@ public class SimpleServiceManager extend
}
DiscoveryRegistry registry = new DiscoveryRegistry();
+
+ // register the mbean
+ try {
+ MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+
+ ObjectNameBuilder jmxName = new ObjectNameBuilder("openejb");
+ jmxName.set("type", "Server");
+ jmxName.set("name", "DiscoveryRegistry");
+
+ ObjectName objectName = jmxName.build();
+ server.registerMBean(new ManagedMBean(registry), objectName);
+ } catch (Exception e) {
+ logger.error("Unable to register MBean ", e);
+ }
+
+
SystemInstance.get().setComponent(DiscoveryRegistry.class, registry);
ServiceFinder serviceFinder = new ServiceFinder("META-INF/");