You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/16 03:01:28 UTC

svn commit: r934657 - in /openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery: EchoNet.java MultipointServer.java Tracker.java

Author: dblevins
Date: Fri Apr 16 01:01:28 2010
New Revision: 934657

URL: http://svn.apache.org/viewvc?rev=934657&view=rev
Log:
As stable as it can get.  Ballons a bit before settling.  Debug statements still need to be cleaned out.

Modified:
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java Fri Apr 16 01:01:28 2010
@@ -17,18 +17,35 @@
 package org.apache.openejb.server.discovery;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * @version $Rev$ $Date$
  */
 public class EchoNet {
 
+    public static void _main(String[] args) throws Exception {
+        MultipointServer a = new MultipointServer(1111, new Tracker.Builder().build()).start();
+        MultipointServer b = new MultipointServer(3333, new Tracker.Builder().build()).start();
+        a.connect(b);
+        b.connect(a);
+        a.connect(b);
+        b.connect(a);
+        a.connect(b);
+        b.connect(a);
+        a.connect(b);
+        b.connect(a);
+    }
+
     public static void main(String[] args) throws Exception {
 
-        final int multiple = 1111;
-        final int base = 3;
+        final int multiple = 1;
+        final int base = 2000;
+//        final int multiple = 1111;
+//        final int base = 1;
 
-        int servers = 3;
+        int servers = 50;
 
         if (args.length > 0)
             servers = Integer.parseInt(args[0]);
@@ -54,5 +71,59 @@ public class EchoNet {
         new CountDownLatch(1).await();
     }
 
+
+    public static class Calc {
+        public static void main(String[] args) {
+            Set<Item> set = new HashSet<Item>();
+
+            int x = 150;
+
+            for (int i = 1; i <= x; i++) {
+                for (int j = 1; j <= x; j++) {
+                    if (i==j) continue;
+
+                    Item item = new Item(i, j);
+                    boolean b = set.add(item);
+//                    if (b) System.out.println("item = " + item);
+                }
+            }
+
+            // 100 4950
+            System.out.println(x + " ? " + 2 + " = " + set.size());
+        }
+
+
+        static class Item {
+            int a;
+            int b;
+
+            Item(int a, int b) {
+                this.a = a;
+                this.b = b;
+            }
+
+            @Override
+            public boolean equals(Object o) {
+                if (o == null || getClass() != o.getClass()) return false;
+
+                Item set = (Item) o;
+
+                if (a == set.a && b == set.b) return true;
+                if (a == set.b && b == set.a) return true;
+
+                return false;
+            }
+
+            @Override
+            public int hashCode() {
+                return 1;
+            }
+
+            @Override
+            public String toString() {
+                return a + " " + b;
+            }
+        }
+    }
 }
 

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/MultipointServer.java Fri Apr 16 01:01:28 2010
@@ -110,9 +110,11 @@ public class MultipointServer {
         private State state = State.OPEN;
         private URI uri;
         public boolean hangup;
+        private final boolean client;
 
         public Session(SocketChannel channel, InetSocketAddress address, URI uri) throws ClosedChannelException {
             this.channel = channel;
+            this.client = uri != null;
             this.uri = uri != null ? uri : URI.create("conn://" + address.getHostName() + ":" + address.getPort());
             this.key = channel.register(selector, 0, this);
         }
@@ -212,11 +214,13 @@ public class MultipointServer {
             return "Session{" +
                     "uri=" + uri +
                     ", state=" + state +
+                    ", owner=" + port +
+                    ", s=" + (client ? channel.socket().getPort() : channel.socket().getLocalPort()) +
+                    ", c=" + (!client ? channel.socket().getPort() : channel.socket().getLocalPort()) +
+                    ", " + (client ? "client" : "server") +
                     '}';
         }
 
-        private final long rate = 3000;
-
         private long last = 0;
 
         public void tick() throws IOException {
@@ -225,7 +229,7 @@ public class MultipointServer {
             long now = System.currentTimeMillis();
             long delay = now - last;
 
-            if (delay > rate) {
+            if (delay > tracker.getHeartRate()) {
                 last = now;
                 heartbeat();
             }
@@ -294,7 +298,6 @@ public class MultipointServer {
                         Session session = (Session) key.attachment();
                         session.channel.finishConnect();
                         session.trace("connected");
-                        connected(session);
 
                         // when you are a client, first say high to everyone
                         // before accepting data
@@ -344,6 +347,8 @@ public class MultipointServer {
 
                                 session.state(java.nio.channels.SelectionKey.OP_WRITE, State.LISTING);
 
+                                session.trace("STARTING");
+
 
                             }
                             break;
@@ -439,8 +444,9 @@ public class MultipointServer {
 
                                     } else {
 
-                                        session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
+                                        session.trace("DONE");
 
+                                        session.state(java.nio.channels.SelectionKey.OP_READ, State.HEARTBEAT);
                                     }
                                 }
                             }
@@ -488,29 +494,36 @@ public class MultipointServer {
                 }
             }
 
-            URI uri = null;
-            while ((uri = pending()) != null) {
+            synchronized (connect) {
+                while (connect.size() > 0) {
 
-                int port = uri.getPort();
-                String host = uri.getHost();
+                    URI uri = connect.removeFirst();
 
-                try {
-                    println("open " + uri);
+                    if (connections.containsKey(uri)) continue;
+
+                    int port = uri.getPort();
+                    String host = uri.getHost();
 
-                    SocketChannel socketChannel = SocketChannel.open();
-                    socketChannel.configureBlocking(false);
+                    try {
+                        println("open " + uri);
 
-                    InetSocketAddress address = new InetSocketAddress(host, port);
+                        SocketChannel socketChannel = SocketChannel.open();
+                        socketChannel.configureBlocking(false);
 
-                    socketChannel.connect(address);
+                        InetSocketAddress address = new InetSocketAddress(host, port);
 
-                    Session session = new Session(socketChannel, address, uri);
-                    session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
+                        socketChannel.connect(address);
 
-                    // seen - needs to get maintained as "connected"
-                    // TODO remove from seen
-                } catch (IOException e) {
-                    throw new RuntimeException(e);
+                        Session session = new Session(socketChannel, address, uri);
+                        session.ops(java.nio.channels.SelectionKey.OP_CONNECT);
+                        session.trace("client");
+                        connections.put(session.uri, session);
+                        
+                        // seen - needs to get maintained as "connected"
+                        // TODO remove from seen
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
                 }
             }
         }
@@ -518,7 +531,9 @@ public class MultipointServer {
 
     private ArrayList<URI> connections() {
         synchronized (connect) {
-            return new ArrayList<URI>(connections.keySet());
+            ArrayList<URI> list = new ArrayList<URI>(connections.keySet());
+            list.addAll(connect);
+            return list;
         }
     }
 
@@ -526,10 +541,19 @@ public class MultipointServer {
         Session session = (Session) key.attachment();
 
         session.state(0, State.CLOSED);
-        session.trace("closed");
-        
-        synchronized (connect) {
-            connections.remove(session.uri);
+
+        if (session.hangup) {
+            // This was a duplicate connection and was closed
+            // do not remove this URI from the 'connections'
+            // map as this particular session is not in that
+            // map -- only the good session that will not be
+            // closed is in there.
+            session.trace("hungup");
+        } else {
+            session.trace("closed");
+            synchronized (connect) {
+                connections.remove(session.uri);
+            }
         }
 
         hangup(key);
@@ -563,17 +587,11 @@ public class MultipointServer {
         }
     }
 
-    private URI pending() {
-        synchronized (connect) {
-            if (connect.size() > 0) return connect.removeFirst();
-        }
-        return null;
-    }
-
     private void connected(Session session) {
 
         synchronized (connect) {
             Session duplicate = connections.get(session.uri);
+//            Session duplicate = null;
 
             if (duplicate != null) {
                 session.trace("duplicate");
@@ -585,14 +603,45 @@ public class MultipointServer {
 
                 Session[] sessions = {session, duplicate};
                 Arrays.sort(sessions, new Comparator<Session>() {
+                    // Goal: Keep the connection with the lowest port number
+                    ///
+                    // Low vs high is not very significant.  The critical
+                    // part is that they both choose the same connection.
+                    //
+                    // Port numbers are seen on both sides.  There are two
+                    // ports (one client and one server) for each connection.
+                    //
+                    // Both sides will agree to kill the connection with the
+                    // lowest server port.  If those are the same, then both
+                    // sides will agree to kill the connection with the lowest
+                    // client port.  If those are the same, we still close a
+                    // connection and hope for the best.  If both connections
+                    // are killed we will try again next time another node
+                    // lists the server and we notice we are not connected.
+                    //
                     public int compare(Session a, Session b) {
-                        return rank(a) - rank(b);
+                        int serverRank = server(a) - server(b);
+                        if (serverRank != 0) return serverRank;
+                        return client(a) - client(b);
+                    }
+
+                    private int server(Session a) {
+                        Socket socket = a.channel.socket();
+                        return a.client? socket.getPort(): socket.getLocalPort();
+                    }
+
+                    private int client(Session a) {
+                        Socket socket = a.channel.socket();
+                        return !a.client? socket.getPort(): socket.getLocalPort();
                     }
                 });
 
                 session = sessions[0];
                 duplicate = sessions[1];
 
+                session.trace(session + "@" + session.hashCode() + " KEEP");
+                duplicate.trace(duplicate + "@" + duplicate.hashCode() + " KILL");
+
                 duplicate.hangup = true;
             }
 
@@ -602,10 +651,13 @@ public class MultipointServer {
 
     private int rank(Session session) {
         Socket socket = session.channel.socket();
+        
         return socket.getLocalPort() + socket.getPort();
     }
 
     private void println(String s) {
-        System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
+        if (s.matches(".*(Listening|DONE|KEEP|KILL)")) {
+            System.out.format("%1$tH:%1$tM:%1$tS.%1$tL - %2$s\n", System.currentTimeMillis(), s);
+        }
     }
 }

Modified: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java?rev=934657&r1=934656&r2=934657&view=diff
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java (original)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/Tracker.java Fri Apr 16 01:01:28 2010
@@ -67,6 +67,10 @@ public class Tracker {
     private Map<String, ServiceVitals> discoveredServices = new ConcurrentHashMap<String, ServiceVitals>();
     private DiscoveryListener discoveryListener;
 
+    public long getHeartRate() {
+        return heartRate;
+    }
+
     public void setDiscoveryListener(DiscoveryListener discoveryListener) {
         this.discoveryListener = discoveryListener;
     }
@@ -307,7 +311,7 @@ public class Tracker {
     public static class Builder {
         private String group = "default";
         private int maxMissedHeartbeats = 10;
-        private long heartRate = 500;
+        private long heartRate = 5000;
         // ---------------------------------
         // Listenting specific settings
         private long reconnectDelay = 1000 * 5;