You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/16 03:01:28 UTC
svn commit: r934657 - in
/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery:
EchoNet.java MultipointServer.java Tracker.java
Author: dblevins
Date: Fri Apr 16 01:01:28 2010
New Revision: 934657
URL: http://svn.apache.org/viewvc?rev=934657&view=rev
Log:
As stable as it can get. Ballons a bit before settling. Debug statements still need to be cleaned out.
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java Fri Apr 16 01:01:28 2010
@@ -17,18 +17,35 @@
package org.apache.openejb.server.discovery;
import java.util.concurrent.CountDownLatch;
+import java.util.HashSet;
+import java.util.Set;
/**
* @version $Rev$ $Date$
*/
public class EchoNet {
+ public static void _main(String[] args) throws Exception {
+ MultipointServer a = new MultipointServer(1111, new Tracker.Builder().build()).start();
+ MultipointServer b = new MultipointServer(3333, new Tracker.Builder().build()).start();
+ a.connect(b);
+ b.connect(a);
+ a.connect(b);
+ b.connect(a);
+ a.connect(b);
+ b.connect(a);
+ a.connect(b);
+ b.connect(a);
+ }
+
public static void main(String[] args) throws Exception {
- final int multiple = 1111;
- final int base = 3;
+ final int multiple = 1;
+ final int base = 2000;
+// final int multiple = 1111;
+// final int base = 1;
- int servers = 3;
+ int servers = 50;
if (args.length > 0)
servers = Integer.parseInt(args[0]);
@@ -54,5 +71,59 @@ public class EchoNet {
new CountDownLatch(1).await();
}
+
+ public static class Calc {
+ public static void main(String[] args) {
+ Set<Item> set = new HashSet<Item>();
+
+ int x = 150;
+
+ for (int i = 1; i <= x; i++) {
+ for (int j = 1; j <= x; j++) {
+ if (i==j) continue;
+
+ Item item = new Item(i, j);
+ boolean b = set.add(item);
+// if (b) System.out.println("item = " + item);
+ }
+ }
+
+ // 100 4950
+ System.out.println(x + " ? " + 2 + " = " + set.size());
+ }
+
+
+ static class Item {
+ int a;
+ int b;
+
+ Item(int a, int b) {
+ this.a = a;
+ this.b = b;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Item set = (Item) o;
+
+ if (a == set.a && b == set.b) return true;
+ if (a == set.b && b == set.a) return true;
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return 1;
+ }
+
+ @Override
+ public String toString() {
+ return a + " " + b;
+ }
+ }
+ }
}
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Fri Apr 16 01:01:28 2010
@@ -110,9 +110,11 @@ public class MultipointServer {
private State state = State.OPEN;
private URI uri;
public boolean hangup;
+ private final boolean client;
public Session(SocketChannel channel, InetSocketAddress address, URI uri) throws ClosedChannelException {
this.channel = channel;
+ this.client = uri != null;
this.uri = uri != null ? uri : URI.create("conn://" + address.getHostName() + ":" + address.getPort());
this.key = channel.register(selector, 0, this);
}
@@ -212,11 +214,13 @@ public class MultipointServer {
return "Session{" +
"uri=" + uri +
", state=" + state +
+ ", owner=" + port +
+ ", s=" + (client ? channel.socket().getPort() : channel.socket().getLocalPort()) +
+ ", c=" + (!client ? channel.socket().getPort() : channel.socket().getLocalPort()) +
+ ", " + (client ? "client" : "server") +
'}';
}
- private final long rate = 3000;
-
private long last = 0;
public void tick() throws IOException {
@@ -225,7 +229,7 @@ public class MultipointServer {
long now = System.currentTimeMillis();
long delay = now - last;
- if (delay > rate) {
+ if (delay > tracker.getHeartRate()) {
last = now;
heartbeat();
}
@@ -294,7 +298,6 @@ public class MultipointServer {
Session session = (Session) key.attachment();
session.channel.finishConnect();
session.trace("connected");
- connected(session);
// when you are a client, first say high to everyone
// before accepting data
@@ -344,6 +347,8 @@ public class MultipointServer {
session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
+ session.trace("STARTING");
+
}
break;
@@ -439,8 +444,9 @@ public class MultipointServer {
} else {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+ session.trace("DONE");
+ session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
}
}
}
@@ -488,29 +494,36 @@ public class MultipointServer {
}
}
- URI uri = null;
- while ((uri = pending()) != null) {
+ synchronized (connect) {
+ while (connect.size() > 0) {
- int port = uri.getPort();
- String host = uri.getHost();
+ URI uri = connect.removeFirst();
- try {
- println("open " + uri);
+ if (connections.containsKey(uri)) continue;
+
+ int port = uri.getPort();
+ String host = uri.getHost();
- SocketChannel socketChannel = SocketChannel.open();
- socketChannel.configureBlocking(false);
+ try {
+ println("open " + uri);
- InetSocketAddress address = new InetSocketAddress(host, port);
+ SocketChannel socketChannel = SocketChannel.open();
+ socketChannel.configureBlocking(false);
- socketChannel.connect(address);
+ InetSocketAddress address = new InetSocketAddress(host, port);
- Session session = new Session(socketChannel, address, uri);
- session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
+ socketChannel.connect(address);
- // seen - needs to get maintained as "connected"
- // TODO remove from seen
- } catch (IOException e) {
- throw new RuntimeException(e);
+ Session session = new Session(socketChannel, address, uri);
+ session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
+ session.trace("client");
+ connections.put(session.uri, session);
+
+ // seen - needs to get maintained as "connected"
+ // TODO remove from seen
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
}
@@ -518,7 +531,9 @@ public class MultipointServer {
private ArrayList<URI> connections() {
synchronized (connect) {
- return new ArrayList<URI>(connections.keySet());
+ ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
+ list.addAll(connect);
+ return list;
}
}
@@ -526,10 +541,19 @@ public class MultipointServer {
Session session = (Session) key.attachment();
session.state(0, State.CLOSED);
- session.trace("closed");
-
- synchronized (connect) {
- connections.remove(session.uri);
+
+ if (session.hangup) {
+ // This was a duplicate connection and was closed
+ // do not remove this URI from the 'connections'
+ // map as this particular session is not in that
+ // map -- only the good session that will not be
+ // closed is in there.
+ session.trace("hungup");
+ } else {
+ session.trace("closed");
+ synchronized (connect) {
+ connections.remove(session.uri);
+ }
}
hangup(key);
@@ -563,17 +587,11 @@ public class MultipointServer {
}
}
- private URI pending() {
- synchronized (connect) {
- if (connect.size() > 0) return connect.removeFirst();
- }
- return null;
- }
-
private void connected(Session session) {
synchronized (connect) {
Session duplicate = connections.get(session.uri);
+// Session duplicate = null;
if (duplicate != null) {
session.trace("duplicate");
@@ -585,14 +603,45 @@ public class MultipointServer {
Session[] sessions = {session, duplicate};
Arrays.sort(sessions, new Comparator<Session>() {
+ // Goal: Keep the connection with the lowest port number
+ ///
+ // Low vs high is not very significant. The critical
+ // part is that they both choose the same connection.
+ //
+ // Port numbers are seen on both sides. There are two
+ // ports (one client and one server) for each connection.
+ //
+ // Both sides will agree to kill the connection with the
+ // lowest server port. If those are the same, then both
+ // sides will agree to kill the connection with the lowest
+ // client port. If those are the same, we still close a
+ // connection and hope for the best. If both connections
+ // are killed we will try again next time another node
+ // lists the server and we notice we are not connected.
+ //
public int compare(Session a, Session b) {
- return rank(a) - rank(b);
+ int serverRank = server(a) - server(b);
+ if (serverRank != 0) return serverRank;
+ return client(a) - client(b);
+ }
+
+ private int server(Session a) {
+ Socket socket = a.channel.socket();
+ return a.client? socket.getPort(): socket.getLocalPort();
+ }
+
+ private int client(Session a) {
+ Socket socket = a.channel.socket();
+ return !a.client? socket.getPort(): socket.getLocalPort();
}
});
session = sessions[0];
duplicate = sessions[1];
+ session.trace(session + "@" + session.hashCode() + " KEEP");
+ duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL");
+
duplicate.hangup = true;
}
@@ -602,10 +651,13 @@ public class MultipointServer {
private int rank(Session session) {
Socket socket = session.channel.socket();
+
return socket.getLocalPort() + socket.getPort();
}
private void println(String s) {
- System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
+ if (s.matches(".*(Listening|DONE|KEEP|KILL)")) {
+ System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
+ }
}
}
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Fri Apr 16 01:01:28 2010
@@ -67,6 +67,10 @@ public class Tracker {
private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
private DiscoveryListener discoveryListener;
+ public long getHeartRate() {
+ return heartRate;
+ }
+
public void setDiscoveryListener(DiscoveryListener discoveryListener) {
this.discoveryListener = discoveryListener;
}
@@ -307,7 +311,7 @@ public class Tracker {
public static class Builder {
private String group = "default";
private int maxMissedHeartbeats = 10;
- private long heartRate = 500;
+ private long heartRate = 5000;
// ---------------------------------
// Listenting specific settings
private long reconnectDelay = 1000 * 5;