You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2011/12/21 23:48:20 UTC
svn commit: r1221925 - in /openejb/branches/openejb-3.1.x: ./
container/openejb-core/src/test/java/org/apache/openejb/config/
examples/alternate-descriptors/src/main/resources/META-INF/
server/openejb-multicast/src/main/java/org/apache/openejb/server/d...
Author: dblevins
Date: Wed Dec 21 22:48:19 2011
New Revision: 1221925
URL: http://svn.apache.org/viewvc?rev=1221925&view=rev
Log:
svn merge -r 1221923:1221924 https://svn.apache.org/repos/asf/openejb/trunk/openejb
http://svn.apache.org/viewvc?view=revision&revision=1221924
------------------------------------------------------------------------
r1221924 | dblevins | 2011-12-21 14:46:48 -0800 (Wed, 21 Dec 2011) | 3 lines
OPENEJB-1729: Reliability of Multipoint remove event when last peer disappears
OPENEJB-1730: Reliability of multipoint discovery heartrates less than 1 second
------------------------------------------------------------------------
Modified:
openejb/branches/openejb-3.1.x/ (props changed)
openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java (props changed)
openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml (props changed)
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
Propchange: openejb/branches/openejb-3.1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1:779593
-/openejb/trunk/openejb:1182222
+/openejb/trunk/openejb:1182222,1221924
/openejb/trunk/openejb3:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528
Propchange: openejb/branches/openejb-3.1.x/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1/container/openejb-core/src/test/java/org/apache/openejb/config/UberInterfaceTest.java:779593
-/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222
+/openejb/trunk/openejb/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:1182222,1221924
/openejb/trunk/openejb3/container/openejb-core/src/test/java/org/apache/openejb/config/BusinessInterfacesTest.java:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1004172,1029528
Propchange: openejb/branches/openejb-3.1.x/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Dec 21 22:48:19 2011
@@ -1,3 +1,3 @@
/openejb/branches/openejb-3.1.1/examples/alternate-descriptors/src/main/resources/META-INF/ejb-jar.xml:779593
-/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222
+/openejb/trunk/openejb/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:1182222,1221924
/openejb/trunk/openejb3/examples/alternate-descriptors/src/main/resources/META-INF/test.ejb-jar.xml:943472,943862,943965,944757,945989,946399,946485,946489,946705,946792,946805,946814,946861,946863-946864,947010,947017,947042,948022,948241,948243,948548,949014,949233,950391,950801,951611,953191,953196,953556,955104,955496,957463,962382,962750,987030,1029528
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgent.java Wed Dec 21 22:48:19 2011
@@ -53,6 +53,16 @@ public class MultipointDiscoveryAgent im
private Tracker tracker;
private MultipointServer multipointServer;
+ private boolean debug;
+ private String name;
+
+ public MultipointDiscoveryAgent() {
+ }
+
+ public MultipointDiscoveryAgent(boolean debug, String name) {
+ this.debug = debug;
+ this.name = name;
+ }
public void init(Properties props) {
@@ -73,6 +83,7 @@ public class MultipointDiscoveryAgent im
builder.setReconnectDelay(options.get("reconnect_delay", builder.getReconnectDelay()));
builder.setExponentialBackoff(options.get("exponential_backoff", builder.getExponentialBackoff()));
builder.setMaxReconnectAttempts(options.get("max_reconnect_attempts", builder.getMaxReconnectAttempts()));
+ builder.setDebug(debug);
tracker = builder.build();
}
@@ -121,7 +132,7 @@ public class MultipointDiscoveryAgent im
try {
if (running.compareAndSet(false, true)) {
- multipointServer = new MultipointServer(host, port, tracker).start();
+ multipointServer = new MultipointServer(host, port, tracker, name, debug).start();
this.port = multipointServer.getPort();
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Wed Dec 21 22:48:19 2011
@@ -16,6 +16,7 @@
*/
package org.apache.openejb.server.discovery;
+import org.apache.openejb.util.Join;
import org.apache.openejb.util.LogCategory;
import org.apache.openejb.util.Logger;
@@ -42,7 +43,9 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -52,30 +55,46 @@ public class MultipointServer {
private static final Logger log = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), MultipointServer.class);
private static final URI END_LIST = URI.create("end:list");
-
+
private final String host;
private final int port;
private final Selector selector;
private final URI me;
+ /**
+ * Only used for toString to make debugging easier
+ */
+ private final String name;
+
+
private final Tracker tracker;
private final LinkedList<URI> connect = new LinkedList<URI>();
private final Map<URI, Session> connections = new HashMap<URI, Session>();
+ private boolean debug = false;
public MultipointServer(int port, Tracker tracker) throws IOException {
this("localhost", port, tracker);
}
public MultipointServer(String host, int port, Tracker tracker) throws IOException {
+ this(host, port, tracker, randomColor());
+ }
+
+ public MultipointServer(String host, int port, Tracker tracker, String name) throws IOException {
+ this(host, port, tracker, name, false);
+ }
+
+ public MultipointServer(String host, int port, Tracker tracker, String name, boolean debug) throws IOException {
if (tracker == null) throw new NullPointerException("tracker cannot be null");
this.host = host;
this.tracker = tracker;
-
+ this.name = name;
+ this.debug = debug;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket serverSocket = serverChannel.socket();
- InetSocketAddress address = new InetSocketAddress(host,port);
+ InetSocketAddress address = new InetSocketAddress(host, port);
serverSocket.bind(address);
serverChannel.configureBlocking(false);
@@ -100,7 +119,7 @@ public class MultipointServer {
_run();
}
});
- thread.setName("Server." + port);
+ thread.setName(Join.join(".", "MultipointServer", name, port));
thread.start();
}
return this;
@@ -150,7 +169,7 @@ public class MultipointServer {
private void trace(String str) {
// println(message(str));
- if (log.isDebugEnabled()) {
+ if (debug && log.isDebugEnabled()) {
log.debug(message(str));
}
}
@@ -164,7 +183,9 @@ public class MultipointServer {
}
private String message(String str) {
- StringBuilder sb = new StringBuilder();
+ final StringBuilder sb = new StringBuilder();
+ sb.append(name);
+ sb.append(":");
sb.append(port);
sb.append(" ");
if (key.isValid()) {
@@ -189,11 +210,11 @@ public class MultipointServer {
}
public void write(Collection<?> uris) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
for (Object uri : uris) {
- String s = uri.toString();
- byte[] b = s.getBytes("UTF-8");
+ final String s = uri.toString();
+ final byte[] b = s.getBytes("UTF-8");
baos.write(b);
baos.write(EOF);
}
@@ -210,16 +231,16 @@ public class MultipointServer {
if (channel.read(read) == -1) throw new EOFException();
- byte[] buf = read.array();
+ final byte[] buf = read.array();
- int end = endOfText(buf, 0, read.position());
+ final int end = endOfText(buf, 0, read.position());
if (end < 0) return null;
// Copy the string without the terminator char
- String text = new String(buf, 0, end, "UTF-8");
+ final String text = new String(buf, 0, end, "UTF-8");
- int newPos = read.position() - end;
+ final int newPos = read.position() - end;
System.arraycopy(buf, end + 1, buf, 0, newPos - 1);
read.position(newPos - 1);
@@ -248,10 +269,10 @@ public class MultipointServer {
public void tick() throws IOException {
if (state != State.HEARTBEAT) return;
- long now = System.currentTimeMillis();
- long delay = now - last;
+ final long now = System.currentTimeMillis();
+ final long delay = now - last;
- if (delay > tracker.getHeartRate()) {
+ if (delay >= tracker.getHeartRate()) {
last = now;
heartbeat();
}
@@ -266,8 +287,6 @@ public class MultipointServer {
}
write(strings);
state(SelectionKey.OP_READ | SelectionKey.OP_WRITE, State.HEARTBEAT);
-
- tracker.checkServices();
}
}
@@ -278,342 +297,379 @@ public class MultipointServer {
private final AtomicBoolean running = new AtomicBoolean();
private void _run() {
+
+ // The selectorTimeout ensures that even when there are no IO events,
+ // this loop will "wake up" and execute at least as frequently as the
+ // expected heartrate.
+ //
+ // We initiate WRITE events (the heartbeats we send) in this loop, so that
+ // detail is critical.
+ long selectorTimeout = tracker.getHeartRate();
+
+ // For reliability purposes we will actually adjust the selectorTimeout
+ // on each iteration of the loop, shrinking it down just a little to a
+ // account for the execution time of the loop itself.
+
while (running.get()) {
+
+ final long start = System.nanoTime();
+
try {
- selector.select(1000);
+ selector.select(selectorTimeout);
} catch (IOException ex) {
ex.printStackTrace();
break;
}
- Set keys = selector.selectedKeys();
-
- Iterator iterator = keys.iterator();
+ final Set keys = selector.selectedKeys();
+ final Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
- SelectionKey key = (SelectionKey) iterator.next();
+ final SelectionKey key = (SelectionKey) iterator.next();
iterator.remove();
try {
- if (key.isAcceptable()) {
-
- // we are a server
+ if (key.isAcceptable()) doAccept(key);
- // when you are a server, we must first listen for the
- // address of the client before sending data.
+ if (key.isConnectable()) doConnect(key);
- // once they send us their address, we will send our
- // full list of known addresses, followed by the "end"
- // address to signal that we are done.
+ if (key.isReadable()) doRead(key);
- // Afterward we will only pulls our heartbeat
+ if (key.isWritable()) doWrite(key);
- ServerSocketChannel server = (ServerSocketChannel) key.channel();
- SocketChannel client = server.accept();
- InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
+ } catch (CancelledKeyException ex) {
+ synchronized (connect) {
+ final Session session = (Session) key.attachment();
+ if (session.state != State.CLOSED) {
+ close(key);
+ }
+ }
+ } catch (ClosedChannelException ex) {
+ synchronized (connect) {
+ final Session session = (Session) key.attachment();
+ if (session.state != State.CLOSED) {
+ close(key);
+ }
+ }
+ } catch (IOException ex) {
+ final Session session = (Session) key.attachment();
+ session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage());
+ close(key);
+ }
- client.configureBlocking(false);
+ }
- Session session = new Session(client, address, null);
- session.trace("accept");
- session.state(java.nio.channels.SelectionKey.OP_READ, State.GREETING);
- }
+ // This loop can generate WRITE keys (the heartbeats we send)
+ for (SelectionKey key : selector.keys()) {
+ final Session session = (Session) key.attachment();
- if (key.isConnectable()) {
+ try {
+ if (session != null && session.state == State.HEARTBEAT) session.tick();
+ } catch (IOException ex) {
+ close(key);
+ }
+ }
- // we are a client
+ // Here is where we actually will expire missing services
+ tracker.checkServices();
- Session session = (Session) key.attachment();
- session.channel.finishConnect();
- session.trace("connected");
+ initiateConnections();
- // when you are a client, first say high to everyone
- // before accepting data
+ selectorTimeout = adjustedSelectorTimeout(start);
+ }
+ }
- // once a server reads our address, it will send it's
- // full list of known addresses, followed by the "end"
- // address to signal that it is done.
+ private long adjustedSelectorTimeout(long start) {
+ final long end = System.nanoTime();
+ final long elapsed = TimeUnit.NANOSECONDS.toMillis(end - start);
+ final long heartRate = tracker.getHeartRate();
- // we will then send our full list of known addresses,
- // followed by the "end" address to signal we are done.
+ return Math.max(1, heartRate - elapsed);
+ }
- // Afterward the server will only pulls its heartbeat
+ private void initiateConnections() {
+ synchronized (connect) {
+ while (connect.size() > 0) {
- // separately, we will initiate connections to everyone
- // in the list who we have not yet seen.
+ final URI uri = connect.removeFirst();
- // WRITE our GREETING
- session.write(me);
+ if (connections.containsKey(uri)) continue;
- session.state(java.nio.channels.SelectionKey.OP_WRITE, State.GREETING);
- }
+ final int port = uri.getPort();
+ final String host = uri.getHost();
- if (key.isReadable()) {
+ try {
+ println("open " + uri);
+ // Create a non-blocking NIO channel
+ final SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
- Session session = (Session) key.attachment();
+ final InetSocketAddress address = new InetSocketAddress(host, port);
- switch (session.state) {
- case GREETING: { // read
+ socketChannel.connect(address);
- // This state is only reachable as a SERVER
- // The client connected and said hello by sending
- // its URI to let us know who they are
+ final Session session = new Session(socketChannel, address, uri);
+ session.ops(SelectionKey.OP_CONNECT);
+ session.trace("client");
+ connections.put(session.uri, session);
- // Once this is read, the client will expect us
- // to send our full list of URIs followed by the
- // "end" address.
+ // seen - needs to get maintained as "connected"
+ // TODO remove from seen
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
- // So we switch to WRITE LISTING and they switch
- // to READ LISTING
+ private void doWrite(SelectionKey key) throws IOException {
+ final Session session = (Session) key.attachment();
- // Then we will switch to READ LISTING and they
- // will switch to WRITE LISTING
-
- String message = session.read();
+ switch (session.state) {
+ case GREETING: { // write
- if (message == null) break; // need to read more
+ // Only CLIENTs write a GREETING message
+ // As we are a client, the first thing we do
+ // is READ the server's LIST
- session.setURI(URI.create(message));
+ if (session.drain()) {
+ session.state(SelectionKey.OP_READ, State.LISTING);
+ }
- connected(session);
+ }
+ break;
- session.trace("welcome");
+ case LISTING: { // write
- ArrayList<URI> list = connections();
+ if (session.drain()) {
- // When they read themselves on the list
- // they'll know it's time to list their URIs
+ if (session.client) {
+ // CLIENTs list last, so at this point we've read
+ // the server's list and have written ours
- list.remove(me); // yank
- list.add(END_LIST); // add to the end
+ session.trace("DONE WRITING");
- session.write(list);
+ session.state(SelectionKey.OP_READ, State.HEARTBEAT);
- session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+ } else {
+ // SERVERs always write their list first, so at this
+ // point we switch to LIST READ mode
- session.trace("STARTING");
+ session.state(SelectionKey.OP_READ, State.LISTING);
+ }
+ }
+ }
+ break;
- }
- break;
+ case HEARTBEAT: { // write
- case LISTING: { // read
+ if (session.drain()) {
- String message = null;
+ session.last = System.currentTimeMillis();
- while ((message = session.read()) != null) {
+ session.trace("send");
- session.trace(message);
+ session.state(SelectionKey.OP_READ, State.HEARTBEAT);
- URI uri = URI.create(message);
+ }
- if (END_LIST.equals(uri)) {
+ }
+ break;
+ }
+ }
- if (session.client) {
+ private void doRead(SelectionKey key) throws IOException {
+ final Session session = (Session) key.attachment();
- ArrayList<URI> list = connections();
+ switch (session.state) {
+ case GREETING: { // read
- for (URI reported : session.listed) {
- list.remove(reported);
- }
+ // This state is only reachable as a SERVER
+ // The client connected and said hello by sending
+ // its URI to let us know who they are
- // When they read us on the list
- // they'll know it's time to switch to heartbeat
+ // Once this is read, the client will expect us
+ // to send our full list of URIs followed by the
+ // "end" address.
- list.remove(session.uri);
- list.add(END_LIST);
-
- session.write(list);
+ // So we switch to WRITE LISTING and they switch
+ // to READ LISTING
- session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+ // Then we will switch to READ LISTING and they
+ // will switch to WRITE LISTING
- } else {
+ final String message = session.read();
- // We are a SERVER in this relationship, so we will have already
- // listed our known peers by this point. From here we switch to
- // heartbeating
-
- // heartbeat time
- if (session.hangup) {
- session.state(0, State.CLOSED);
- session.trace("hangup");
- hangup(key);
+ if (message == null) break; // need to read more
- } else {
+ session.setURI(URI.create(message));
- session.trace("DONE READING");
+ connected(session);
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+ session.trace("welcome");
- }
+ final ArrayList<URI> list = connections();
- }
+ // When they read themselves on the list
+ // they'll know it's time to list their URIs
- break;
+ list.remove(me); // yank
+ list.add(END_LIST); // add to the end
- } else {
+ session.write(list);
- session.listed.add(uri);
+ session.state(SelectionKey.OP_WRITE, State.LISTING);
- try {
- connect(uri);
- } catch (Exception e) {
- println("connect failed " + uri + " - " + e.getMessage());
- e.printStackTrace();
- }
- }
- }
+ session.trace("STARTING");
- }
- break;
- case HEARTBEAT: { // read
+ }
+ break;
- String message = null;
- while ((message = session.read()) != null) {
- tracker.processData(message);
- }
- }
- break;
- }
+ case LISTING: { // read
- }
+ String message = null;
- if (key.isWritable()) {
+ while ((message = session.read()) != null) {
- Session session = (Session) key.attachment();
+ session.trace(message);
- switch (session.state) {
- case GREETING: { // write
+ final URI uri = URI.create(message);
- // Only CLIENTs write a GREETING message
- // As we are a client, the first thing we do
- // is READ the server's LIST
-
- if (session.drain()) {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.LISTING);
- }
+ if (END_LIST.equals(uri)) {
- }
- break;
+ if (session.client) {
- case LISTING: { // write
+ final ArrayList<URI> list = connections();
- if (session.drain()) {
+ for (URI reported : session.listed) {
+ list.remove(reported);
+ }
- if (session.client) {
- // CLIENTs list last, so at this point we've read
- // the server's list and have written ours
-
- session.trace("DONE WRITING");
+ // When they read us on the list
+ // they'll know it's time to switch to heartbeat
- session.state(SelectionKey.OP_READ, State.HEARTBEAT);
-
- } else {
- // SERVERs always write their list first, so at this
- // point we switch to LIST READ mode
+ list.remove(session.uri);
+ list.add(END_LIST);
- session.state(SelectionKey.OP_READ, State.LISTING);
+ session.write(list);
- }
- }
- }
- break;
+ session.state(SelectionKey.OP_WRITE, State.LISTING);
- case HEARTBEAT: { // write
+ } else {
- if (session.drain()) {
+ // We are a SERVER in this relationship, so we will have already
+ // listed our known peers by this point. From here we switch to
+ // heartbeating
- session.last = System.currentTimeMillis();
+ // heartbeat time
+ if (session.hangup) {
+ session.state(0, State.CLOSED);
+ session.trace("hangup");
+ hangup(key);
- session.trace("ping");
+ } else {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+ session.trace("DONE READING");
- }
+ session.state(SelectionKey.OP_READ, State.HEARTBEAT);
}
- break;
- }
- }
- } catch (CancelledKeyException ex) {
- synchronized (connect) {
- Session session = (Session) key.attachment();
- if (session.state != State.CLOSED) {
- close(key);
}
- }
- } catch (ClosedChannelException ex) {
- synchronized (connect) {
- Session session = (Session) key.attachment();
- if (session.state != State.CLOSED) {
- close(key);
+
+ break;
+
+ } else {
+
+ session.listed.add(uri);
+
+ try {
+ connect(uri);
+ } catch (Exception e) {
+ println("connect failed " + uri + " - " + e.getMessage());
+ e.printStackTrace();
}
}
- } catch (IOException ex) {
- Session session = (Session) key.attachment();
- session.trace(ex.getClass().getSimpleName() + ": " + ex.getMessage());
- close(key);
}
}
+ break;
- for (SelectionKey key : selector.keys()) {
- Session session = (Session) key.attachment();
+ case HEARTBEAT: { // read
- try {
- if (session != null && session.state == State.HEARTBEAT) session.tick();
- } catch (IOException ex) {
- close(key);
+ String message = null;
+ while ((message = session.read()) != null) {
+ session.trace(message);
+ tracker.processData(message);
}
}
+ break;
+ }
+ }
- synchronized (connect) {
- while (connect.size() > 0) {
+ private void doConnect(SelectionKey key) throws IOException {
+ // we are a client
- URI uri = connect.removeFirst();
+ final Session session = (Session) key.attachment();
+ session.channel.finishConnect();
+ session.trace("connected");
- if (connections.containsKey(uri)) continue;
+ // when you are a client, first say high to everyone
+ // before accepting data
- int port = uri.getPort();
- String host = uri.getHost();
+ // once a server reads our address, it will send it's
+ // full list of known addresses, followed by the "end"
+ // address to signal that it is done.
- try {
- println("open " + uri);
+ // we will then send our full list of known addresses,
+ // followed by the "end" address to signal we are done.
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
+ // Afterward the server will only pulls its heartbeat
- InetSocketAddress address = new InetSocketAddress(host, port);
+ // separately, we will initiate connections to everyone
+ // in the list who we have not yet seen.
- socketChannel.connect(address);
+ // WRITE our GREETING
+ session.write(me);
- Session session = new Session(socketChannel, address, uri);
- session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
- session.trace("client");
- connections.put(session.uri, session);
-
- // seen - needs to get maintained as "connected"
- // TODO remove from seen
- } catch (IOException e) {
- log.warning("Error connecting to " + host + ":" + port, e);
- }
- }
- }
- }
+ session.state(SelectionKey.OP_WRITE, State.GREETING);
+ }
+
+ private void doAccept(SelectionKey key) throws IOException {
+ // we are a server
+
+ // when you are a server, we must first listen for the
+ // address of the client before sending data.
+
+ // once they send us their address, we will send our
+ // full list of known addresses, followed by the "end"
+ // address to signal that we are done.
+
+ // Afterward we will only pulls our heartbeat
+
+ final ServerSocketChannel server = (ServerSocketChannel) key.channel();
+ final SocketChannel client = server.accept();
+ final InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
+
+ client.configureBlocking(false);
+
+ final Session session = new Session(client, address, null);
+ session.trace("accept");
+ session.state(SelectionKey.OP_READ, State.GREETING);
}
private ArrayList<URI> connections() {
synchronized (connect) {
- ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
+ final ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
list.addAll(connect);
return list;
}
}
private void close(SelectionKey key) {
- Session session = (Session) key.attachment();
+ final Session session = (Session) key.attachment();
session.state(0, State.CLOSED);
@@ -675,7 +731,7 @@ public class MultipointServer {
// by us. We will both have detected this situation
// and know it needs fixing. Only one of us can hangup
- Session[] sessions = {session, duplicate};
+ final Session[] sessions = {session, duplicate};
Arrays.sort(sessions, new Comparator<Session>() {
// Goal: Keep the connection with the lowest port number
///
@@ -700,13 +756,13 @@ public class MultipointServer {
}
private int server(Session a) {
- Socket socket = a.channel.socket();
- return a.client? socket.getPort(): socket.getLocalPort();
+ final Socket socket = a.channel.socket();
+ return a.client ? socket.getPort() : socket.getLocalPort();
}
private int client(Session a) {
- Socket socket = a.channel.socket();
- return !a.client? socket.getPort(): socket.getLocalPort();
+ final Socket socket = a.channel.socket();
+ return !a.client ? socket.getPort() : socket.getLocalPort();
}
});
@@ -724,8 +780,139 @@ public class MultipointServer {
}
private void println(String s) {
-// if (s.matches(".*(Listening|DONE|KEEP|KILL)")) {
+// if (debug && s.matches(".*(Listening|DONE|KEEP|KILL)")) {
// System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
// }
}
+
+ @Override
+ public String toString() {
+ return "MultipointServer{" +
+ "name='" + name + '\'' +
+ ", me=" + me +
+ '}';
+ }
+
+ private static String randomColor() {
+ String[] colors = {
+ "almond",
+ "amber",
+ "amethyst",
+ "apple",
+ "apricot",
+ "aqua",
+ "aquamarine",
+ "ash",
+ "azure",
+ "banana",
+ "beige",
+ "black",
+ "blue",
+ "brick",
+ "bronze",
+ "brown",
+ "burgundy",
+ "carrot",
+ "charcoal",
+ "cherry",
+ "chestnut",
+ "chocolate",
+ "chrome",
+ "cinnamon",
+ "citrine",
+ "cobalt",
+ "copper",
+ "coral",
+ "cornflower",
+ "cotton",
+ "cream",
+ "crimson",
+ "cyan",
+ "ebony",
+ "emerald",
+ "forest",
+ "fuchsia",
+ "ginger",
+ "gold",
+ "goldenrod",
+ "gray",
+ "green",
+ "grey",
+ "indigo",
+ "ivory",
+ "jade",
+ "jasmine",
+ "khaki",
+ "lava",
+ "lavender",
+ "lemon",
+ "lilac",
+ "lime",
+ "macaroni",
+ "magenta",
+ "magnolia",
+ "mahogany",
+ "malachite",
+ "mango",
+ "maroon",
+ "mauve",
+ "mint",
+ "moonstone",
+ "navy",
+ "ocean",
+ "olive",
+ "onyx",
+ "orange",
+ "orchid",
+ "papaya",
+ "peach",
+ "pear",
+ "pearl",
+ "periwinkle",
+ "pine",
+ "pink",
+ "pistachio",
+ "platinum",
+ "plum",
+ "prune",
+ "pumpkin",
+ "purple",
+ "quartz",
+ "raspberry",
+ "red",
+ "rose",
+ "rosewood",
+ "ruby",
+ "salmon",
+ "sapphire",
+ "scarlet",
+ "sienna",
+ "silver",
+ "slate",
+ "strawberry",
+ "tan",
+ "tangerine",
+ "taupe",
+ "teal",
+ "titanium",
+ "topaz",
+ "turquoise",
+ "umber",
+ "vanilla",
+ "violet",
+ "watermelon",
+ "white",
+ "yellow"
+ };
+
+ final Random random = new Random();
+ long l = random.nextLong();
+
+ if (l < 0) l *= -1;
+
+ final long index = l % colors.length;
+ final String s = colors[(int) index];
+
+ return s;
+ }
}
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Wed Dec 21 22:48:19 2011
@@ -48,8 +48,9 @@ public class Tracker {
private final int maxReconnectAttempts;
private final long exponentialBackoff;
private final boolean useExponentialBackOff;
+ private final boolean debug;
- public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log) {
+ public Tracker(String group, long heartRate, int maxMissedHeartbeats, long reconnectDelay, long maxReconnectDelay, int maxReconnectAttempts, long exponentialBackoff, final Logger log, boolean debug) {
this.group = group;
this.groupPrefix = group + ":";
@@ -61,7 +62,7 @@ public class Tracker {
this.exponentialBackoff = exponentialBackoff;
this.useExponentialBackOff = exponentialBackoff > 1;
this.log = log;
-
+ this.debug = debug;
this.log.info("Created " + this);
}
@@ -74,6 +75,10 @@ public class Tracker {
return heartRate;
}
+ public int getMaxMissedHeartbeats() {
+ return maxMissedHeartbeats;
+ }
+
public void setDiscoveryListener(DiscoveryListener discoveryListener) {
this.discoveryListener = discoveryListener;
}
@@ -147,7 +152,7 @@ public class Tracker {
for (ServiceVitals serviceVitals : discoveredServices.values()) {
if (serviceVitals.getLastHeartbeat() < expireTime && !isSelf(serviceVitals.service)) {
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug("Expired " + serviceVitals.service + String.format(" Timeout{lastSeen=%s, threshold=%s}", serviceVitals.getLastHeartbeat() - now, threshold ));
}
@@ -159,6 +164,10 @@ public class Tracker {
}
}
+ private boolean debug() {
+ return debug && log.isDebugEnabled();
+ }
+
private final Executor executor = new ThreadPoolExecutor(1, 1, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runable) {
Thread t = new Thread(runable, "Discovery Agent Notifier");
@@ -168,7 +177,7 @@ public class Tracker {
});
private void fireServiceRemovedEvent(final URI uri) {
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug(String.format("Removed Service{uri=%s}", uri));
}
@@ -189,7 +198,7 @@ public class Tracker {
}
private void fireServiceAddedEvent(final URI uri) {
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug(String.format("Added Service{uri=%s}", uri));
}
@@ -262,7 +271,7 @@ public class Tracker {
// Consider that the service recovery has succeeded if it has not
// failed in 60 seconds.
if (!dead && failureCount > 0 && (lastHeartBeat - recoveryTime) > 1000 * 60) {
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug("I now think that the " + service + " service has recovered.");
}
failureCount = 0;
@@ -289,7 +298,7 @@ public class Tracker {
delay = reconnectDelay;
}
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug("Remote failure of " + service + " while still receiving multicast advertisements. " +
"Advertising events will be suppressed for " + delay
+ " ms, the current failure count is: " + failureCount);
@@ -312,7 +321,7 @@ public class Tracker {
// Are we done trying to recover this guy?
if (maxReconnectAttempts > 0 && failureCount > maxReconnectAttempts) {
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug("Max reconnect attempts of the " + service + " service has been reached.");
}
return false;
@@ -323,7 +332,7 @@ public class Tracker {
return false;
}
- if (log.isDebugEnabled()) {
+ if (debug()) {
log.debug("Resuming event advertisement of the " + service + " service.");
}
dead = false;
@@ -357,6 +366,7 @@ public class Tracker {
private long exponentialBackoff = 0;
private int maxReconnectAttempts = 10; // todo: check this out
private Logger logger;
+ private boolean debug;
// ---------------------------------
@@ -424,9 +434,17 @@ public class Tracker {
this.logger = logger;
}
+ public boolean isDebug() {
+ return debug;
+ }
+
+ public void setDebug(boolean debug) {
+ this.debug = debug;
+ }
+
public Tracker build() {
logger = Logger.getInstance(LogCategory.OPENEJB_SERVER.createChild("discovery"), Tracker.class);
- return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger);
+ return new Tracker(group, heartRate, maxMissedHeartbeats, reconnectDelay, maxReconnectDelay, maxReconnectAttempts, exponentialBackoff, logger, debug);
}
}
Modified: openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java
URL: http://svn.apache.org/viewvc/openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java?rev=1221925&r1=1221924&r2=1221925&view=diff
==============================================================================
--- openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java (original)
+++ openejb/branches/openejb-3.1.x/server/openejb-multicast/src/test/java/org/apache/openejb/server/discovery/MultipointDiscoveryAgentTest.java Wed Dec 21 22:48:19 2011
@@ -20,8 +20,6 @@ import junit.framework.TestCase;
import org.apache.openejb.server.DiscoveryListener;
import org.apache.openejb.server.DiscoveryRegistry;
import org.apache.openejb.util.Join;
-import org.apache.openejb.util.LogCategory;
-import org.apache.openejb.util.Logger;
import java.net.URI;
import java.util.ArrayList;
@@ -65,13 +63,13 @@ public class MultipointDiscoveryAgentTes
};
final List<Node> nodes = new ArrayList<Node>();
- final Node root = new Node("0", listener);
+ final Node root = new Node(0, listener);
nodes.add(root);
for (int i = 0; i < PEERS; i++) {
- final Node node = new Node("0", listener, root.getAgent().getPort());
+ final Node node = new Node(0, listener, root.getAgent().getPort());
nodes.add(node);
}
@@ -100,14 +98,50 @@ public class MultipointDiscoveryAgentTes
}
}
+ public void _debug() throws Exception {
+ System.setProperty("log4j.category.OpenEJB.server.discovery", "debug");
+
+ System.setProperty("log4j.appender.C.layout", "org.apache.log4j.PatternLayout");
+ System.setProperty("log4j.appender.C.layout.ConversionPattern", "%d - %m%n");
+
+ final URI greenService = new URI("green://localhost:5555");
+ final Node green = new Node(5555, new Listener("green"), true, "green", 5000);
+
+ green.getRegistry().registerService(greenService);
+
+// launch(green, "blue", 4444);
+// launch(green, "red", 6666);
+// launch(green, "yellow", 8888);
+ final Node orange = launch(green, "orange", 7777);
+
+ Thread.sleep(500000);
+
+ orange.getAgent().stop();
+
+ Thread.sleep(5000);
+
+ }
+
+ private Node launch(Node green, String color, int port) throws Exception {
+ final URI orangeService = new URI(color + "://localhost:"+ port);
+ final Node orange = new Node(port, new Listener(color), green.getPort());
+ orange.getRegistry().registerService(orangeService);
+ Thread.sleep(100);
+ return orange;
+ }
+
public static class Node {
private final MultipointDiscoveryAgent agent;
private final DiscoveryRegistry registry;
- public Node(String p, DiscoveryListener listener, int... peers) throws Exception {
- this.agent = new MultipointDiscoveryAgent();
+ public Node(int p, DiscoveryListener listener, int... peers) throws Exception {
+ this(p, listener, false, null, 5000, peers);
+ }
+
+ public Node(int p, DiscoveryListener listener, boolean debug, String name, int heartRate, int... peers) throws Exception {
+ this.agent = new MultipointDiscoveryAgent(debug, name);
final Properties props = new Properties();
- props.put("port", p);
+ props.put("port", p+"");
List<String> uris = new ArrayList<String>(peers.length);
for (int port : peers) {
@@ -115,8 +149,8 @@ public class MultipointDiscoveryAgentTes
}
props.put("initialServers", Join.join(",", uris));
- props.put("max_missed_heartbeats", "2");
- props.put("heart_rate", "200");
+ props.put("max_missed_heartbeats", "1");
+ props.put("heart_rate", ""+ heartRate);
agent.init(props);
this.registry = new DiscoveryRegistry(agent);
@@ -131,5 +165,25 @@ public class MultipointDiscoveryAgentTes
public DiscoveryRegistry getRegistry() {
return registry;
}
+
+ public int getPort() {
+ return agent.getPort();
+ }
+ }
+
+ private static class Listener implements DiscoveryListener {
+ private final String name;
+
+ private Listener(String name) {
+ this.name = name;
+ }
+
+ public void serviceAdded(URI service) {
+// System.out.printf("[%s] added = %s\n", name, service);
+ }
+
+ public void serviceRemoved(URI service) {
+// System.out.printf("[%s] removed = %s\n", name, service);
+ }
}
}