You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/13 03:42:12 UTC

svn commit: r933460 - /openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java

Author: dblevins
Date: Tue Apr 13 01:42:11 2010
New Revision: 933460

URL: http://svn.apache.org/viewvc?rev=933460&view=rev
Log:
Duplicate detection and hangup

Modified:
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=933460&r1=933459&r2=933460&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Tue Apr 13 01:42:11 2010
@@ -24,6 +24,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
+import java.net.Socket;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
@@ -34,11 +35,13 @@ import java.nio.channels.SocketChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
@@ -54,7 +57,7 @@ public class MultipointServer {
     private final Tracker tracker;
 
     private final LinkedList<URI> connect = new LinkedList<URI>();
-    private final LinkedList<URI> connections = new LinkedList<URI>();
+    private final Map<URI, Session> connections = new HashMap<URI, Session>();
 
 
     public MultipointServer(int port, Tracker tracker) throws IOException {
@@ -106,6 +109,7 @@ public class MultipointServer {
         private ByteBuffer write;
         private State state = State.GREETING;
         private URI uri;
+        public boolean hangup;
 
         public Session(SocketChannel channel, InetSocketAddress address, URI uri) throws ClosedChannelException {
             this.channel = channel;
@@ -118,23 +122,17 @@ public class MultipointServer {
             return this;
         }
 
-        private void println(String s) {
-            trace(s);
-        }
-
         public void state(int ops, State state) {
             this.state = state;
             key.interestOps(ops);
         }
 
         public void setURI(URI uri) {
-            connected(uri);
             this.uri = uri;
         }
 
         private void trace(String str) {
-//            System.out.println();
-            System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), message(str));
+            println(message(str));
 
             if (log.isDebugEnabled()) {
                 log.debug(message(str));
@@ -291,7 +289,7 @@ public class MultipointServer {
                         Session session = (Session) key.attachment();
                         session.channel.finishConnect();
 
-                        connected(session.uri);
+                        connected(session);
 
                         // when you are a client, first say high to everyone
                         // before accepting data
@@ -324,7 +322,9 @@ public class MultipointServer {
 
                                 session.setURI(URI.create(message));
 
-                                session.println("welcome");
+                                connected(session);
+
+                                session.trace("welcome");
 
                                 ArrayList<URI> list = connections();
 
@@ -353,7 +353,7 @@ public class MultipointServer {
 
                                     session.listed.add(uri);
 
-                                    session.println(message);
+                                    session.trace(message);
 
                                     // they listed me, means they want my list
                                     if (uri.equals(me)) {
@@ -376,7 +376,12 @@ public class MultipointServer {
 
                                     } else if (uri.equals(session.uri)) {
 
-                                        session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+                                        if (session.hangup) {
+                                            println("Hanging up duplicate " + session);
+                                            hangup(key);
+                                        } else {
+                                            session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+                                        }
 
                                     } else {
                                         try {
@@ -441,7 +446,7 @@ public class MultipointServer {
 
                                     session.last = System.currentTimeMillis();
 
-                                    session.println("ping");
+                                    session.trace("ping");
 
                                     session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
 
@@ -498,7 +503,7 @@ public class MultipointServer {
 
     private ArrayList<URI> connections() {
         synchronized (connect) {
-            return new ArrayList<URI>(connections);
+            return new ArrayList<URI>(connections.keySet());
         }
     }
 
@@ -508,7 +513,11 @@ public class MultipointServer {
         synchronized (connect) {
             connections.remove(session.uri);
         }
-        
+
+        hangup(key);
+    }
+
+    private void hangup(SelectionKey key) {
         key.cancel();
         try {
             key.channel().close();
@@ -529,7 +538,7 @@ public class MultipointServer {
         if (me.equals(uri)) return;
 
         synchronized (connect) {
-            if (!connections.contains(uri)) {
+            if (!connections.containsKey(uri)) {
                 connect.addLast(uri);
             }
         }
@@ -542,14 +551,41 @@ public class MultipointServer {
         return null;
     }
 
-    private void connected(URI uri) {
-        synchronized (connect){
-            connections.add(uri);
+    private void connected(Session session) {
+        synchronized (connect) {
+            Session duplicate = connections.get(session.uri);
+
+            if (duplicate != null) {
+                println("duplicate " + session);
+
+                // At this point we know we have two sockets open
+                // to the client, one created by them and one created
+                // by us.  We will both have detected this situation
+                // and know it needs fixing.  Only one of us can hangup
+
+                Session[] sessions = {session, duplicate};
+                Arrays.sort(sessions, new Comparator<Session>() {
+                    public int compare(Session a, Session b) {
+                        return rank(a) - rank(b);
+                    }
+                });
+
+                session = sessions[0];
+                duplicate = sessions[1];
+
+                duplicate.hangup = true;
+            }
+
+            connections.put(session.uri, session);
         }
-        println("seen " + uri);
+    }
+
+    private int rank(Session session) {
+        Socket socket = session.channel.socket();
+        return socket.getLocalPort() + socket.getPort();
     }
 
     private void println(String s) {
-        System.out.println(port + " - " + s);
+        System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
     }
 }