You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2009/11/07 11:04:12 UTC

svn commit: r833672 - /mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java

Author: jvermillard
Date: Sat Nov  7 10:04:12 2009
New Revision: 833672

URL: http://svn.apache.org/viewvc?rev=833672&view=rev
Log:
accepting client connections

Modified:
    mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java

Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=833672&r1=833671&r2=833672&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Sat Nov  7 10:04:12 2009
@@ -21,8 +21,11 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.security.InvalidParameterException;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +34,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * TODO 
  * 
@@ -39,9 +43,16 @@
 public class NioSocketAcceptor extends AbstractIoAcceptor {
     
     static final Logger LOG = LoggerFactory.getLogger(NioSocketAcceptor.class);
+    
+    // timeout for the selector accepting connections
+    private static final int SELECT_ACCEPT_TIMEOUT = 1000;
 
+    // map of the created selection keys, mainly used for cancelling them.
     private Map<SocketAddress,ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
     
+    // object in charge of selecting server socket for accepting client connections
+    private ClientAcceptor acceptor = new ClientAcceptor();
+    
     @Override
     public void bind(SocketAddress... localAddress) throws IOException {
    
@@ -54,7 +65,10 @@
                 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                 LOG.debug("binding address {}",address);
                 serverSocketChannel.socket().bind(address);
-                serverSocketChannels.put(address,serverSocketChannel); 
+                serverSocketChannel.configureBlocking(false);
+                serverSocketChannels.put(address,serverSocketChannel);
+                // add the server socket to the acceptor selector thread
+                acceptor.add(serverSocketChannel);
             }
         }
     }
@@ -75,6 +89,8 @@
                 }
                 channel.socket().close();
                 serverSocketChannels.remove(socketAddress);
+                // remove the server socket form the selector accepting connections 
+                acceptor.remove(channel);
             }
         }
     }
@@ -85,4 +101,91 @@
             unbind(socketAddress);
         }
     }
-}
+    
+    /**
+     * Working thread accepting client connection on bound server channels.
+     * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+     *
+     */
+    private class ClientAcceptor {
+        
+        // selector used for listen to accept events (when a client want to connect to the bound
+        // server port)
+        private Selector selector;
+        
+        // methods using this map are synchronised, so we can use a non thread safe HashMap
+        private Map<ServerSocketChannel,SelectionKey> keyForChannels = new HashMap<ServerSocketChannel, SelectionKey>(1);
+        
+        /**
+         * Add a channel to the selector, 
+         * will start accepting connections.
+         * @param serverSocketChannel the channel to accept
+         */
+        public synchronized void add(ServerSocketChannel serverSocketChannel) {
+            LOG.debug("adding channel {} to the acceptor thread",serverSocketChannel.socket().getInetAddress());
+            // if no selector, we create one
+            try {
+                if (selector == null) {
+                    selector = Selector.open();
+                }
+                SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+                keyForChannels.put(serverSocketChannel, key);
+            } catch (IOException e) {
+                LOG.error("IOException while registering a new ServerSocketChannel",e);
+            }
+            // spawn again the worker if needed
+            if (worker == null || !worker.isAlive()) {
+                worker = new ClientAcceptorWorker();
+                worker.start();
+            }
+        }
+        
+        /**
+         * Remove a channel from the selector, will stop accepting connections.
+         * @param serverSocketChannel the channel to remove
+         */
+        public synchronized void remove(ServerSocketChannel serverSocketChannel) {
+            LOG.debug("removing channel {} from the acceptor thread",serverSocketChannel.socket().getInetAddress());
+            if (selector == null || serverSocketChannel == null) {
+                throw new InvalidParameterException("serverSocketChannel");
+            }
+            // get the key and cancel it
+            SelectionKey key = keyForChannels.remove(serverSocketChannel);
+            if (key == null) {
+                throw new InvalidParameterException("serverSocketChannel");
+            }
+            key.cancel();
+        }
+ 
+        private ClientAcceptorWorker worker;
+        
+        private class ClientAcceptorWorker extends Thread {
+            
+            @Override
+            public void run() {
+                int keyCount;
+                synchronized (this) {
+                    keyCount = keyForChannels.size();
+                }
+                while(keyCount >0) {
+                    try {
+                        int eventCount = selector.select(SELECT_ACCEPT_TIMEOUT);
+                        for (int i = 0;i < eventCount; i++) {
+                            
+                        }
+                    } catch (IOException e) {
+                        LOG.error("IOException while accepting connections");
+                    }
+                    
+                }
+                // close the selector, because all the server socket was removed and unbound
+                // a selector will be reopen for the next added selector.
+                try {
+                    selector.close();
+                } catch (IOException e) {
+                    LOG.error("IOException while closing accept selector",e);
+                }
+            }                
+        }
+    }
+}
\ No newline at end of file