You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2009/11/07 11:04:12 UTC
svn commit: r833672 -
/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
Author: jvermillard
Date: Sat Nov 7 10:04:12 2009
New Revision: 833672
URL: http://svn.apache.org/viewvc?rev=833672&view=rev
Log:
accepting client connections
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
Modified: mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=833672&r1=833671&r2=833672&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Sat Nov 7 10:04:12 2009
@@ -21,8 +21,11 @@
import java.io.IOException;
import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.security.InvalidParameterException;
+import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -31,6 +34,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* TODO
*
@@ -39,9 +43,16 @@
public class NioSocketAcceptor extends AbstractIoAcceptor {
static final Logger LOG = LoggerFactory.getLogger(NioSocketAcceptor.class);
+
+ // timeout for the selector accepting connections
+ private static final int SELECT_ACCEPT_TIMEOUT = 1000;
+ // map of the created selection keys, mainly used for cancelling them.
private Map<SocketAddress,ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
+ // object in charge of selecting server socket for accepting client connections
+ private ClientAcceptor acceptor = new ClientAcceptor();
+
@Override
public void bind(SocketAddress... localAddress) throws IOException {
@@ -54,7 +65,10 @@
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
LOG.debug("binding address {}",address);
serverSocketChannel.socket().bind(address);
- serverSocketChannels.put(address,serverSocketChannel);
+ serverSocketChannel.configureBlocking(false);
+ serverSocketChannels.put(address,serverSocketChannel);
+ // add the server socket to the acceptor selector thread
+ acceptor.add(serverSocketChannel);
}
}
}
@@ -75,6 +89,8 @@
}
channel.socket().close();
serverSocketChannels.remove(socketAddress);
+ // remove the server socket form the selector accepting connections
+ acceptor.remove(channel);
}
}
}
@@ -85,4 +101,91 @@
unbind(socketAddress);
}
}
-}
+
+ /**
+ * Working thread accepting client connection on bound server channels.
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ *
+ */
+ private class ClientAcceptor {
+
+ // selector used for listen to accept events (when a client want to connect to the bound
+ // server port)
+ private Selector selector;
+
+ // methods using this map are synchronised, so we can use a non thread safe HashMap
+ private Map<ServerSocketChannel,SelectionKey> keyForChannels = new HashMap<ServerSocketChannel, SelectionKey>(1);
+
+ /**
+ * Add a channel to the selector,
+ * will start accepting connections.
+ * @param serverSocketChannel the channel to accept
+ */
+ public synchronized void add(ServerSocketChannel serverSocketChannel) {
+ LOG.debug("adding channel {} to the acceptor thread",serverSocketChannel.socket().getInetAddress());
+ // if no selector, we create one
+ try {
+ if (selector == null) {
+ selector = Selector.open();
+ }
+ SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ keyForChannels.put(serverSocketChannel, key);
+ } catch (IOException e) {
+ LOG.error("IOException while registering a new ServerSocketChannel",e);
+ }
+ // spawn again the worker if needed
+ if (worker == null || !worker.isAlive()) {
+ worker = new ClientAcceptorWorker();
+ worker.start();
+ }
+ }
+
+ /**
+ * Remove a channel from the selector, will stop accepting connections.
+ * @param serverSocketChannel the channel to remove
+ */
+ public synchronized void remove(ServerSocketChannel serverSocketChannel) {
+ LOG.debug("removing channel {} from the acceptor thread",serverSocketChannel.socket().getInetAddress());
+ if (selector == null || serverSocketChannel == null) {
+ throw new InvalidParameterException("serverSocketChannel");
+ }
+ // get the key and cancel it
+ SelectionKey key = keyForChannels.remove(serverSocketChannel);
+ if (key == null) {
+ throw new InvalidParameterException("serverSocketChannel");
+ }
+ key.cancel();
+ }
+
+ private ClientAcceptorWorker worker;
+
+ private class ClientAcceptorWorker extends Thread {
+
+ @Override
+ public void run() {
+ int keyCount;
+ synchronized (this) {
+ keyCount = keyForChannels.size();
+ }
+ while(keyCount >0) {
+ try {
+ int eventCount = selector.select(SELECT_ACCEPT_TIMEOUT);
+ for (int i = 0;i < eventCount; i++) {
+
+ }
+ } catch (IOException e) {
+ LOG.error("IOException while accepting connections");
+ }
+
+ }
+ // close the selector, because all the server socket was removed and unbound
+ // a selector will be reopen for the next added selector.
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.error("IOException while closing accept selector",e);
+ }
+ }
+ }
+ }
+}
\ No newline at end of file