You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/13 03:42:12 UTC
svn commit: r933460 -
/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
Author: dblevins
Date: Tue Apr 13 01:42:11 2010
New Revision: 933460
URL: http://svn.apache.org/viewvc?rev=933460&view=rev
Log:
Duplicate detection and hangup
Modified:
openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=933460&r1=933459&r2=933460&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Tue Apr 13 01:42:11 2010
@@ -24,6 +24,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.net.Socket;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
@@ -34,11 +35,13 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -54,7 +57,7 @@ public class MultipointServer {
private final Tracker tracker;
private final LinkedList<URI> connect = new LinkedList<URI>();
- private final LinkedList<URI> connections = new LinkedList<URI>();
+ private final Map<URI, Session> connections = new HashMap<URI, Session>();
public MultipointServer(int port, Tracker tracker) throws IOException {
@@ -106,6 +109,7 @@ public class MultipointServer {
private ByteBuffer write;
private State state = State.GREETING;
private URI uri;
+ public boolean hangup;
public Session(SocketChannel channel, InetSocketAddress address, URI uri) throws ClosedChannelException {
this.channel = channel;
@@ -118,23 +122,17 @@ public class MultipointServer {
return this;
}
- private void println(String s) {
- trace(s);
- }
-
public void state(int ops, State state) {
this.state = state;
key.interestOps(ops);
}
public void setURI(URI uri) {
- connected(uri);
this.uri = uri;
}
private void trace(String str) {
-// System.out.println();
- System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), message(str));
+ println(message(str));
if (log.isDebugEnabled()) {
log.debug(message(str));
@@ -291,7 +289,7 @@ public class MultipointServer {
Session session = (Session) key.attachment();
session.channel.finishConnect();
- connected(session.uri);
+ connected(session);
// when you are a client, first say high to everyone
// before accepting data
@@ -324,7 +322,9 @@ public class MultipointServer {
session.setURI(URI.create(message));
- session.println("welcome");
+ connected(session);
+
+ session.trace("welcome");
ArrayList<URI> list = connections();
@@ -353,7 +353,7 @@ public class MultipointServer {
session.listed.add(uri);
- session.println(message);
+ session.trace(message);
// they listed me, means they want my list
if (uri.equals(me)) {
@@ -376,7 +376,12 @@ public class MultipointServer {
} else if (uri.equals(session.uri)) {
- session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+ if (session.hangup) {
+ println("Hanging up duplicate " + session);
+ hangup(key);
+ } else {
+ session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+ }
} else {
try {
@@ -441,7 +446,7 @@ public class MultipointServer {
session.last = System.currentTimeMillis();
- session.println("ping");
+ session.trace("ping");
session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
@@ -498,7 +503,7 @@ public class MultipointServer {
private ArrayList<URI> connections() {
synchronized (connect) {
- return new ArrayList<URI>(connections);
+ return new ArrayList<URI>(connections.keySet());
}
}
@@ -508,7 +513,11 @@ public class MultipointServer {
synchronized (connect) {
connections.remove(session.uri);
}
-
+
+ hangup(key);
+ }
+
+ private void hangup(SelectionKey key) {
key.cancel();
try {
key.channel().close();
@@ -529,7 +538,7 @@ public class MultipointServer {
if (me.equals(uri)) return;
synchronized (connect) {
- if (!connections.contains(uri)) {
+ if (!connections.containsKey(uri)) {
connect.addLast(uri);
}
}
@@ -542,14 +551,41 @@ public class MultipointServer {
return null;
}
- private void connected(URI uri) {
- synchronized (connect){
- connections.add(uri);
+ private void connected(Session session) {
+ synchronized (connect) {
+ Session duplicate = connections.get(session.uri);
+
+ if (duplicate != null) {
+ println("duplicate " + session);
+
+ // At this point we know we have two sockets open
+ // to the client, one created by them and one created
+ // by us. We will both have detected this situation
+ // and know it needs fixing. Only one of us can hangup
+
+ Session[] sessions = {session, duplicate};
+ Arrays.sort(sessions, new Comparator<Session>() {
+ public int compare(Session a, Session b) {
+ return rank(a) - rank(b);
+ }
+ });
+
+ session = sessions[0];
+ duplicate = sessions[1];
+
+ duplicate.hangup = true;
+ }
+
+ connections.put(session.uri, session);
}
- println("seen " + uri);
+ }
+
+ private int rank(Session session) {
+ Socket socket = session.channel.socket();
+ return socket.getLocalPort() + socket.getPort();
}
private void println(String s) {
- System.out.println(port + " - " + s);
+ System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
}
}