You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tomee.apache.org by db...@apache.org on 2010/04/05 18:47:17 UTC

svn commit: r930899 - /openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java

Author: dblevins
Date: Mon Apr  5 16:47:17 2010
New Revision: 930899

URL: http://svn.apache.org/viewvc?rev=930899&view=rev
Log:
Still a work in progress, but too much time and energy in it to risk loosing.  This proof-of-concept code creates a peer-to-peer network using NIO and a single thread to read/write to all peers in the network, with no duplicate connections and the ability to discover new peers from other peers.

The ultimate goal is to develop a TCP replacement for our multicast (UDP) discovery.

Added:
    openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java   (with props)

Added: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
URL: http://svn.apache.org/viewvc/openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java?rev=930899&view=auto
==============================================================================
--- openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java (added)
+++ openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java Mon Apr  5 16:47:17 2010
@@ -0,0 +1,567 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.openejb.server.discovery;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import static java.nio.channels.SelectionKey.OP_READ;
+import static java.nio.channels.SelectionKey.OP_WRITE;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * @version $Rev$ $Date$
+ */
+public class EchoNet {
+
+    public static void main(String[] args) throws Exception {
+        listen(3333);
+
+        Server a = new Server(4444).start();
+        Server b = new Server(5555).start();
+//        Server c = new Server(6666).start();
+        a.connect(3333);
+//        b.connect(3333);
+        a.connect(b);
+//        b.connect(a);
+//        b.connect(c);
+//        c.connect(a);
+//        c.connect(b);
+
+
+        new CountDownLatch(1).await();
+    }
+
+    private static void listen(final int port) {
+        new Thread(new Runnable() {
+            public void run() {
+                try {
+                    runServer(port);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }).start();
+    }
+
+    private static void runServer(final int port) throws IOException {
+        ServerSocket serverSocket = new ServerSocket(port);
+        while (true) {
+            final Socket socket = serverSocket.accept();
+//            new Thread(new Runnable() {
+//                public void run() {
+//                    try {
+//                        read(port, socket);
+//                    } catch (IOException e) {
+//                        e.printStackTrace();
+//                    }
+//                }
+//            }).start();
+        }
+    }
+
+    private static void read(int port, Socket socket) throws IOException {
+//        System.out.println(port + " - accept = " + socket.getRemoteSocketAddress());
+        InputStream in = socket.getInputStream();
+        int i = -1;
+        byte[] buffer = new byte[1024];
+        while ((i = in.read(buffer)) != -1) {
+            buffer[i++] = (byte)'\n';
+//            System.out.write(buffer, 0, i);
+        }
+    }
+
+    public static class Server {
+        private int port;
+        private Selector selector;
+
+        private final URI me;
+
+        public Server(int port) {
+            this.port = port;
+
+            me = URI.create("conn://localhost:" + port);
+
+            ServerSocketChannel serverChannel;
+            try {
+                serverChannel = ServerSocketChannel.open();
+                ServerSocket ss = serverChannel.socket();
+                InetSocketAddress address = new InetSocketAddress(port);
+                ss.bind(address);
+                serverChannel.configureBlocking(false);
+
+                selector = Selector.open();
+
+                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+                println("Listening");
+
+            } catch (IOException ex) {
+                ex.printStackTrace();
+                return;
+            }
+        }
+
+        public Server start() {
+            Thread thread = new Thread(new Runnable() {
+                public void run() {
+                    _run();
+                }
+
+
+            });
+            thread.setName("Server." + port);
+            thread.start();
+            return this;
+        }
+
+
+        private void printOps(SelectionKey key) {
+            StringBuilder sb = new StringBuilder(key.hashCode() + "  opts: ");
+            if (key.isAcceptable()) sb.append("a");
+            if (key.isConnectable()) sb.append("c");
+            if (key.isReadable()) sb.append("r");
+            if (key.isWritable()) sb.append("w");
+            println(sb.toString());
+        }
+
+        public class Session {
+            private final LinkedList<ByteBuffer> buffers = new LinkedList<ByteBuffer>();
+
+            private final ByteBuffer read = ByteBuffer.allocate(1024);
+            private final ByteBuffer write = ByteBuffer.allocate(1024);
+
+            public SelectionKey key;
+
+            public InetSocketAddress address;
+
+            private State state = State.GREETING;
+            private final boolean server;
+            private Iterator<URI> listing;
+            private List<URI> listed = new ArrayList<URI>();
+            public URI uri;
+
+            public Session(boolean server) {
+                this.server = server;
+            }
+
+            public ByteBuffer pop() {
+                try {
+                    return buffers.removeFirst();
+                } finally {
+                    if (buffers.size() == 0) key.interestOps(OP_READ);
+                }
+            }
+
+            private void println(String s) {
+                trace(s);
+//                System.out.println(port + " - " + uri.getPort() + " - " + s);
+            }
+
+            public void state(int ops, State state) {
+                this.state = state;
+                key.interestOps(ops);
+
+                trace("*");
+            }
+
+            private void trace(String str) {
+                StringBuilder sb = new StringBuilder();
+                sb.append(port);
+                sb.append(" ");
+                if ((key.interestOps() & OP_READ) == OP_READ) sb.append("<");
+                if ((key.interestOps() & OP_WRITE) == OP_WRITE) sb.append(">");
+//                if ((ops & OP_READ) == OP_READ) sb.append("(r)");
+//                if ((ops & OP_WRITE) == OP_WRITE) sb.append("(w)");
+                sb.append(" ");
+                sb.append(uri.getPort());
+                sb.append(" ");
+                sb.append(this.state);
+                sb.append(" ");
+                sb.append(str);
+                System.out.println(sb.toString());
+            }
+
+        }
+
+        private static enum State {
+            GREETING, LISTING, HEARTBEAT
+        }
+
+        List<URI> seen = new ArrayList<URI>();
+
+        private void _run() {
+            while (true) {
+
+                try {
+//                    if (me.getPort() != 5555) Thread.sleep(1000);
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    Thread.interrupted();
+                }
+                try {
+                    selector.select();
+//                    selector.select(2000);
+                } catch (IOException ex) {
+                    ex.printStackTrace();
+                    break;
+                }
+
+                Set keys = selector.selectedKeys();
+
+//                println("selection " + keys.size() + "  " + keys);
+
+                Iterator iterator = keys.iterator();
+                while (iterator.hasNext()) {
+                    SelectionKey key = (SelectionKey) iterator.next();
+                    iterator.remove();
+
+//                    printOps(key);
+
+
+                    try {
+                        if (key.isAcceptable()) {
+
+                            // we are a server
+
+                            // when you are a server, we must first listen for the
+                            // address of the client before sending data.
+
+                            // once they send us their address, we will send our
+                            // full list of known addresses, followed by our own
+                            // address to signal that we are done.
+
+                            // Afterward we will only pulls our heartbeat
+
+                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
+                            SocketChannel client = server.accept();
+                            InetSocketAddress address = (InetSocketAddress) client.socket().getRemoteSocketAddress();
+
+                            println("accept " + address.getPort());
+                            client.configureBlocking(false);
+
+                            register(client, address, OP_READ, true);
+                        }
+
+                        if (key.isConnectable()) {
+
+                            // we are a client
+
+                            SocketChannel channel = (SocketChannel) key.channel();
+                            channel.finishConnect();
+
+                            // when you are a client, first say high to everyone
+                            // before accepting data
+
+                            // once a server reads our address, it will send it's
+                            // full list of known addresses, followed by it's own
+                            // address to signal that it is done.
+
+                            // we will initiate connections to everyone in the list
+                            // who we have not yet seen.
+
+                            // Afterward the server will only pulls its heartbeat
+
+                            key.interestOps(OP_WRITE);
+
+                        }
+
+                        if (key.isReadable()) {
+
+                            SocketChannel client = (SocketChannel) key.channel();
+                            Session session = (Session) key.attachment();
+
+                            ByteBuffer output = session.read;
+
+                            output.clear();
+
+                            int i = client.read(output);
+
+                            String message = new String(output.array(), output.arrayOffset(), output.position());
+
+                            if (message.length() == 0) {
+                                session.println(" --- ");
+                                return;
+                            } else {
+//                                session.println(message);
+                            }
+
+                            URI uri = URI.create(message);
+
+                            switch (session.state) {
+                                case GREETING: { // read
+                                    session.uri = uri;
+
+                                    session.println("welcome");
+
+                                    session.state(OP_WRITE, State.LISTING);
+
+                                    ArrayList<URI> list = new ArrayList<URI>(seen);
+
+                                    // When they read themselves on the list
+                                    // they'll know it's time to list their URIs
+
+                                    list.remove(me); // yank
+                                    list.remove(uri); // yank
+                                    list.add(uri); // add to the end
+
+                                    session.listing = list.iterator();
+
+                                }; break;
+
+                                case LISTING: { // read
+
+                                    session.listed.add(uri);
+
+                                    session.println(message);
+
+                                    // they listed me, means they want my list
+                                    if (uri.equals(me)) {
+
+                                        // switch to write
+                                        session.state(OP_WRITE, State.LISTING);
+
+                                        ArrayList<URI> list = new ArrayList<URI>(seen);
+
+                                        for (URI reported : session.listed) {
+                                            list.remove(reported);
+                                        }
+                                        
+                                        // When they read us on the list
+                                        // they'll know it's time to switch to heartbeat
+
+                                        list.remove(session.uri);
+                                        list.remove(me); // yank if in the middle
+                                        list.add(me); // add to the end
+
+                                        session.listing = list.iterator();
+                                    } else if (uri.equals(session.uri)) {
+
+                                        session.state(OP_WRITE | OP_READ, State.HEARTBEAT);
+
+                                    } else if (!seen.contains(uri)) {
+                                        try {
+                                            connect(uri);
+                                        } catch (Exception e) {
+                                            println("connect failed " + uri + " - " + e.getMessage());
+                                            e.printStackTrace();
+                                        }
+                                    } else {
+                                        session.println("ambiguous");
+
+                                        session.state(OP_WRITE | OP_READ, State.HEARTBEAT);
+                                    }
+                                }; break;
+
+                                case HEARTBEAT: { // read
+
+                                    session.println("pong");
+
+                                }; break;
+                            }
+
+                        }
+
+                        if (key.isWritable()) {
+
+                            SocketChannel client = (SocketChannel) key.channel();
+                            Session session = (Session) key.attachment();
+
+                            switch (session.state) {
+                                case GREETING: { // write
+
+                                    session.println("hello");
+
+                                    ByteBuffer output = session.write;
+
+                                    output.clear();
+
+                                    output.put(me.toString().getBytes());
+
+                                    output.flip();
+
+                                    client.write(output);
+
+                                    session.state(OP_READ, State.LISTING);
+
+                                }; break;
+
+                                case LISTING: { // write
+
+                                    URI uri = session.listing.next();
+
+                                    ByteBuffer output = session.write;
+
+                                    output.clear();
+
+                                    session.println(uri.toString());
+
+                                    output.put(uri.toString().getBytes());
+
+                                    output.flip();
+
+                                    client.write(output);
+
+                                    if (!session.listing.hasNext()) {
+
+                                        // We've just signaled them to
+                                        // go next and list their URIs
+                                        if (uri.equals(session.uri)) {
+
+                                            session.state(OP_READ, State.LISTING);
+
+                                        } else if (uri.equals(me)) {
+                                            // we've clearly signaled them that
+                                            // we are done and do not expect
+                                            // to read any URLs
+
+                                            session.state(OP_WRITE |OP_READ, State.HEARTBEAT);
+
+                                        } else {
+
+                                            session.println("ambiguous state, switching to heartbeat");
+
+                                            session.state(OP_WRITE |OP_READ, State.HEARTBEAT);
+
+                                        }
+                                    }
+                                }; break;
+
+                                case HEARTBEAT: { // write
+
+                                    ByteBuffer output = session.write;
+
+                                    output.clear();
+
+                                    output.put(me.toString().getBytes());
+
+                                    output.flip();
+
+                                    client.write(output);
+
+                                    session.println("ping");
+
+                                }; break;
+                            }
+                        }
+
+                    } catch (IOException ex) {
+                        key.cancel();
+                        try {
+                            key.channel().close();
+                        } catch (IOException cex) {
+                        }
+                    }
+
+                }
+
+            }
+        }
+
+        private void push(ByteBuffer output, Session session) {
+            for (Session fellow : clients.values()) {
+                // Don't write to the originator
+                if (fellow == session) continue;
+                fellow.buffers.addLast(output);
+                fellow.key.interestOps(OP_READ | OP_WRITE);
+            }
+        }
+
+
+        public void connect(Server s) throws Exception {
+            connect(s.port);
+        }
+
+        public void connect(int port) throws Exception {
+            connect(URI.create("conn://localhost:" + port));
+        }
+
+        public void connect(URI uri) throws Exception {
+            if (me.equals(uri)) return;
+            
+            int port = uri.getPort();
+            String host = uri.getHost();
+
+            try {
+                println("open " + uri);
+
+                SocketChannel socketChannel = SocketChannel.open();
+                socketChannel.configureBlocking(false);
+
+                InetSocketAddress address = new InetSocketAddress(host, port);
+
+                socketChannel.connect(address);
+
+                Session session = register(socketChannel, address, SelectionKey.OP_CONNECT, false);
+                session.uri = uri;
+
+                // seen - needs to get maintained as "connected"
+                // TODO remove from seen
+                seen.add(uri);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+//            println("++ " + port);
+        }
+
+        private final Map<SocketAddress, Session> clients = new ConcurrentHashMap<SocketAddress, Session>();
+//        private final List<Session> clients = new ArrayList<Session>();
+
+        private Session register(SocketChannel client, InetSocketAddress address, int ops, boolean server) throws IOException {
+//            println("Registering " + address);
+            Session session = new Session(server);
+            session.key = client.register(selector, ops, session);
+            session.address = address;
+            session.uri = URI.create("conn://" + address.getHostName() + ":" + address.getPort());
+
+//            clients.add(session);
+//            Session duplicate = clients.put(address, session);
+//            if (duplicate != null) {
+//                println("duplicate = " + duplicate + "  " + address);
+//                duplicate.key.channel().close();
+//                println("closed duplicate " + address);
+//            }
+//
+//            for (SocketAddress node : clients.keySet()) {
+//                println("node " + node);
+//            }
+
+            return session;
+        }
+
+        private void println(String s) {
+            System.out.println(port + " - " + s);
+        }
+    }
+}
+

Propchange: openejb/trunk/openejb3/server/openejb-multicast/src/main/java/org/apache/openejb/server/discovery/EchoNet.java
------------------------------------------------------------------------------
    svn:eol-style = native