You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/05/22 17:02:29 UTC

svn commit: r1341509 - in /mina/trunk: core/src/main/java/org/apache/mina/service/ core/src/main/java/org/apache/mina/transport/tcp/ core/src/main/java/org/apache/mina/transport/tcp/nio/ core/src/main/java/org/apache/mina/transport/udp/ core/src/main/j...

Author: jvermillard
Date: Tue May 22 15:02:28 2012
New Revision: 1341509

URL: http://svn.apache.org/viewvc?rev=1341509&view=rev
Log:
removed ugly selector factory, preparing for UDP server

Removed:
    mina/trunk/core/src/main/java/org/apache/mina/service/SelectorFactory.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java
    mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/OneThreadSelectorStrategy.java Tue May 22 15:02:28 2012
@@ -19,9 +19,6 @@
  */
 package org.apache.mina.service;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-
 import org.apache.mina.api.IoSession;
 
 /**
@@ -31,36 +28,28 @@ import org.apache.mina.api.IoSession;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  *
  */
-public class OneThreadSelectorStrategy implements SelectorStrategy {
+public class OneThreadSelectorStrategy<PROCESSOR extends SelectorProcessor> implements SelectorStrategy<PROCESSOR> {
     /** The processor in charge of the messages processing */
-    private SelectorProcessor processor;
+    private final PROCESSOR processor;
 
-    /**
-     * Creates an instance of the OneThreadSelectorStrategy class
-     * @param selectorFactory The Selector factory to use to create the processor
-     */
-    public OneThreadSelectorStrategy(SelectorFactory selectorFactory) {
-        this.processor = selectorFactory.getNewSelector("uniqueSelector", this);
+    public OneThreadSelectorStrategy(PROCESSOR processor) {
+    	processor.setStrategy(this);
+    	this.processor = processor;
     }
 
     @Override
-    public SelectorProcessor getSelectorForBindNewAddress() {
+    public PROCESSOR getSelectorForBindNewAddress() {
         return processor;
     }
 
     @Override
-    public SelectorProcessor getSelectorForNewSession(SelectorProcessor acceptingProcessor) {
+    public PROCESSOR getSelectorForNewSession(SelectorProcessor acceptingProcessor) {
         return processor;
     }
 
     @Override
-    public SelectorProcessor getSelectorForWrite(IoSession session) {
+    public PROCESSOR getSelectorForWrite(IoSession session) {
         return processor;
     }
 
-    @Override
-    public void unbind(SocketAddress address) throws IOException {
-        processor.unbind(address);
-    }
-
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java Tue May 22 15:02:28 2012
@@ -23,10 +23,11 @@ package org.apache.mina.service;
 import java.io.IOException;
 import java.net.SocketAddress;
 
-import org.apache.mina.api.IoServer;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
 import org.apache.mina.session.AbstractIoSession;
+import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.udp.AbstractUdpServer;
 
 /**
  * A processor in charge of a group of client session and server sockets.
@@ -36,6 +37,9 @@ import org.apache.mina.session.AbstractI
  */
 public interface SelectorProcessor {
 
+	
+	void setStrategy(SelectorStrategy<?> strategy);
+	
     /**
      * create a session for a freshly accepted client socket
      * @param service
@@ -44,11 +48,20 @@ public interface SelectorProcessor {
     void createSession(IoService service, Object clientSocket) throws IOException;
 
     /**
-     * Bind and start processing this new server address
+     * Bind and start processing this new server TCP address
+     * @param server the server for the new address
      * @param address local address to bind
      * @throws IOException exception thrown if any problem occurs while binding
      */
-    void bindAndAcceptAddress(IoServer server, SocketAddress address) throws IOException;
+    void bindTcpServer(AbstractTcpServer server, SocketAddress address) throws IOException;
+
+    /**
+     * Bind and start processing this new server UDP address
+     * @param server the server for the new address
+     * @param address local address to bind
+     * @throws IOException exception thrown if any problem occurs while binding
+     */
+    void bindUdpServer(AbstractUdpServer server, SocketAddress address) throws IOException;
 
     /**
      * Stop processing and unbind this server address

Modified: mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/service/SelectorStrategy.java Tue May 22 15:02:28 2012
@@ -19,23 +19,20 @@
  */
 package org.apache.mina.service;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-
 import org.apache.mina.api.IoSession;
 
 /**
  * Strategy for balancing server socket and client socket to different selecting/polling threads.
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public interface SelectorStrategy {
+public interface SelectorStrategy<PROCESSOR extends SelectorProcessor> {
 
     /**
      * Provide a {@link SelectorProcessor} for a newly accepted {@link IoSession}.
      * @param acceptingProcessor the selector which accepted the {@link IoSession}
      * @return a processor for processing the new session
      */
-    SelectorProcessor getSelectorForNewSession(SelectorProcessor acceptingProcessor);
+    PROCESSOR getSelectorForNewSession(SelectorProcessor acceptingProcessor);
 
     /**
      * Provide a {@link SelectorProcessor} for a {@link IoSession} which need to write data.
@@ -44,19 +41,13 @@ public interface SelectorStrategy {
      * @param session the session in need of writing
      * @return the selector processor for handling this session write events
      */
-    SelectorProcessor getSelectorForWrite(IoSession session);
+    PROCESSOR getSelectorForWrite(IoSession session);
 
     /**
      * Provide a {@link SelectorProcessor} for processing a newly bound address.
      * The processor will accept the incoming connections.
      * @return a {@link SelectorProcessor} for processing a newly bound address
      */
-    SelectorProcessor getSelectorForBindNewAddress();
+    PROCESSOR getSelectorForBindNewAddress();
 
-    /**
-     * Unbind an address and remove it from its {@link SelectorProcessor} 
-     * @param address the address to be unbound and removed
-     * @throws IOException thrown if any problem occurs while unbinding
-     */
-    void unbind(SocketAddress address) throws IOException;
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java Tue May 22 15:02:28 2012
@@ -25,6 +25,7 @@ import static org.apache.mina.api.IoSess
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
@@ -54,6 +55,7 @@ import org.apache.mina.session.AbstractI
 import org.apache.mina.session.DefaultWriteFuture;
 import org.apache.mina.session.SslHelper;
 import org.apache.mina.session.WriteRequest;
+import org.apache.mina.transport.udp.AbstractUdpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,551 +66,618 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
 public class NioSelectorProcessor implements SelectorProcessor {
-    /** A logger for this class */
-    private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorProcessor.class);
-
-    /**
-     * A timeout used for the select, as we need to get out to deal with idle
-     * sessions
-     */
-    private static final long SELECT_TIMEOUT = 1000L;
-
-    private final SelectorStrategy strategy;
-
-    private final Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
-
-    /** Read buffer for all the incoming bytes (default to 64Kb) */
-    private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
-
-    /** the thread polling and processing the I/O events */
-    private SelectorWorker worker = null;
-
-    /** helper for detecting idleing sessions */
-    private final IdleChecker idleChecker = new IndexedIdleChecker();
-
-    /** A queue containing the servers to bind to this selector */
-    private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
-
-    /** server to remove of the selector */
-    private final Queue<ServerSocketChannel> serversToRemove = new ConcurrentLinkedQueue<ServerSocketChannel>();
-
-    /** new session freshly accepted, placed here for being added to the selector */
-    private final Queue<NioTcpSession> sessionsToConnect = new ConcurrentLinkedQueue<NioTcpSession>();
-
-    /** session to be removed of the selector */
-    private final Queue<NioTcpSession> sessionsToClose = new ConcurrentLinkedQueue<NioTcpSession>();
-
-    /** A queue used to store the sessions to be flushed */
-    private final Queue<NioTcpSession> flushingSessions = new ConcurrentLinkedQueue<NioTcpSession>();
-
-    private Selector selector;
-
-    // Lock for Selector worker, using default. can look into fairness later.
-    // We need to think about a lock less mechanism here.
-    private final Lock workerLock = new ReentrantLock();
-
-    public NioSelectorProcessor(final String name, final SelectorStrategy strategy) {
-        this.strategy = strategy;
-    }
-
-    /**
-     * Add a bound server channel for starting accepting new client connections.
-     * 
-     * @param serverChannel
-     */
-    private void add(final ServerSocketChannel serverChannel, final IoServer server) {
-        LOGGER.debug("adding a server channel {} for server {}", serverChannel, server);
-        this.serversToAdd.add(new Object[] { serverChannel, server });
-        this.wakeupWorker();
-    }
-
-    /**
-     * Wake the I/O worker thread and if none exists, create a new one
-     * FIXME : too much locking there ?
-     */
-    private void wakeupWorker() {
-        this.workerLock.lock();
-        try {
-            if (this.worker == null) {
-                this.worker = new SelectorWorker();
-                this.worker.start();
-            }
-        } finally {
-            this.workerLock.unlock();
-        }
-
-        if (this.selector != null) {
-            this.selector.wakeup();
-        }
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void bindAndAcceptAddress(final IoServer server, final SocketAddress address) throws IOException {
-        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
-
-        // FIXME : should be "genericified"
-        if (server instanceof AbstractTcpServer) {
-            serverSocketChannel.socket().setReuseAddress(((AbstractTcpServer) server).isReuseAddress());
-        }
-        serverSocketChannel.socket().bind(address);
-        serverSocketChannel.configureBlocking(false);
-        this.serverSocketChannels.put(address, serverSocketChannel);
-        this.add(serverSocketChannel, server);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void unbind(final SocketAddress address) throws IOException {
-        ServerSocketChannel channel = this.serverSocketChannels.get(address);
-        channel.socket().close();
-        channel.close();
-        if (this.serverSocketChannels.remove(address) == null) {
-            LOGGER.warn("The server channel for address {} was already unbound", address);
-        }
-        LOGGER.debug("Removing a server channel {}", channel);
-        this.serversToRemove.add(channel);
-        this.wakeupWorker();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void createSession(final IoService service, final Object clientSocket) throws SSLException {
-        LOGGER.debug("create session");
-        final SocketChannel socketChannel = (SocketChannel) clientSocket;
-        final TcpSessionConfig config = (TcpSessionConfig) service.getSessionConfig();
-        final NioTcpSession session = new NioTcpSession(service, socketChannel,
-                this.strategy.getSelectorForNewSession(this));
-
-        try {
-            socketChannel.configureBlocking(false);
-        } catch (IOException e) {
-            LOGGER.error("Unexpected exception, while configuring socket as non blocking", e);
-            throw new RuntimeIoException("cannot configure socket as non-blocking", e);
-        }
-        // apply idle configuration
-        session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
-        session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
-                config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
-
-        // apply the default service socket configuration
-        Boolean keepAlive = config.isKeepAlive();
-
-        if (keepAlive != null) {
-            session.getConfig().setKeepAlive(keepAlive);
-        }
-
-        Boolean oobInline = config.isOobInline();
-
-        if (oobInline != null) {
-            session.getConfig().setOobInline(oobInline);
-        }
-
-        Boolean reuseAddress = config.isReuseAddress();
-
-        if (reuseAddress != null) {
-            session.getConfig().setReuseAddress(reuseAddress);
-        }
-
-        Boolean tcpNoDelay = config.isTcpNoDelay();
-
-        if (tcpNoDelay != null) {
-            session.getConfig().setTcpNoDelay(tcpNoDelay);
-        }
-
-        Integer receiveBufferSize = config.getReceiveBufferSize();
-
-        if (receiveBufferSize != null) {
-            session.getConfig().setReceiveBufferSize(receiveBufferSize);
-        }
-
-        Integer sendBufferSize = config.getSendBufferSize();
-
-        if (sendBufferSize != null) {
-            session.getConfig().setSendBufferSize(sendBufferSize);
-        }
-
-        Integer trafficClass = config.getTrafficClass();
-
-        if (trafficClass != null) {
-            session.getConfig().setTrafficClass(trafficClass);
-        }
-
-        Integer soLinger = config.getSoLinger();
-
-        if (soLinger != null) {
-            session.getConfig().setSoLinger(soLinger);
-        }
-
-        // Set the secured flag if the service is to be used over SSL/TLS
-        if (config.isSecured()) {
-            session.initSecure(config.getSslContext());
-        }
-
-        // event session created
-        session.processSessionCreated();
-
-        // add the session to the queue for being added to the selector
-        this.sessionsToConnect.add(session);
-        this.wakeupWorker();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void flush(final AbstractIoSession session) {
-        LOGGER.debug("scheduling session {} for writing", session);
-        // add the session to the list of session to be registered for writing
-        this.flushingSessions.add((NioTcpSession) session);
-        // wake the selector for unlocking the I/O thread
-        this.wakeupWorker();
-    }
-
-    /**
-     * The worker processing incoming session creation, session destruction requests, session write and reads.
-     * It will also bind new servers.
-     */
-    private class SelectorWorker extends Thread {
-        // map for finding the keys associated with a given server
-        private final Map<ServerSocketChannel, SelectionKey> serverKey = new HashMap<ServerSocketChannel, SelectionKey>();
-
-        // map for finding read keys associated with a given session
-        private final Map<NioTcpSession, SelectionKey> sessionReadKey = new HashMap<NioTcpSession, SelectionKey>();
-
-        @Override
-        public void run() {
-            try {
-                if (NioSelectorProcessor.this.selector == null) {
-                    LOGGER.debug("opening a new selector");
-
-                    try {
-                        NioSelectorProcessor.this.selector = Selector.open();
-                    } catch (IOException e) {
-                        LOGGER.error("IOException while opening a new Selector", e);
-                    }
-                }
-
-                for (;;) {
-                    try {
-                        // pop server sockets for removing
-                        if (NioSelectorProcessor.this.serversToRemove.size() > 0) {
-                            this.processServerRemove();
-                        }
-
-                        // pop new server sockets for accepting
-                        if (NioSelectorProcessor.this.serversToAdd.size() > 0) {
-                            this.processServerAdd();
-                        }
-
-                        // pop new session for starting read/write
-                        if (NioSelectorProcessor.this.sessionsToConnect.size() > 0) {
-                            this.processConnectSessions();
-                        }
-
-                        // pop session for close, if any
-                        if (NioSelectorProcessor.this.sessionsToClose.size() > 0) {
-                            this.processCloseSessions();
-                        }
-
-                        LOGGER.debug("selecting...");
-                        int readyCount = NioSelectorProcessor.this.selector.select(SELECT_TIMEOUT);
-                        LOGGER.debug("... done selecting : {}", readyCount);
-
-                        if (readyCount > 0) {
-                            // process selected keys
-                            Iterator<SelectionKey> selectedKeys = NioSelectorProcessor.this.selector.selectedKeys()
-                                    .iterator();
-
-                            // Loop on each SelectionKey and process any valid action
-                            while (selectedKeys.hasNext()) {
-                                SelectionKey key = selectedKeys.next();
-                                selectedKeys.remove();
-
-                                if (!key.isValid()) {
-                                    continue;
-                                }
-
-                                NioSelectorProcessor.this.selector.selectedKeys().remove(key);
-
-                                if (key.isAcceptable()) {
-                                    this.processAccept(key);
-                                }
-
-                                if (key.isReadable()) {
-                                    this.processRead(key);
-                                }
-
-                                if (key.isWritable()) {
-                                    this.processWrite(key);
-                                }
-
-                            }
-                        }
-
-                        // registering session with data in the write queue for
-                        // writing
-                        while (!NioSelectorProcessor.this.flushingSessions.isEmpty()) {
-                            this.processFlushSessions();
-                        }
-                    } catch (IOException e) {
-                        LOGGER.error("IOException while selecting selector", e);
-                    }
-
-                    // stop the worker if needed
-                    NioSelectorProcessor.this.workerLock.lock();
-
-                    try {
-                        if (NioSelectorProcessor.this.selector.keys().isEmpty()) {
-                            NioSelectorProcessor.this.worker = null;
-                            break;
-                        }
-                    } finally {
-                        NioSelectorProcessor.this.workerLock.unlock();
-                    }
-
-                    // check for idle events
-                    NioSelectorProcessor.this.idleChecker.processIdleSession(System.currentTimeMillis());
-                }
-            } catch (Exception e) {
-                LOGGER.error("Unexpected exception : ", e);
-            }
-        }
-
-        /**
-         * Handles the servers removal
-         */
-        private void processServerRemove() {
-            while (!NioSelectorProcessor.this.serversToRemove.isEmpty()) {
-                ServerSocketChannel channel = NioSelectorProcessor.this.serversToRemove.poll();
-                SelectionKey key = this.serverKey.remove(channel);
-
-                if (key == null) {
-                    LOGGER.error("The server socket was already removed of the selector");
-                } else {
-                    LOGGER.debug("Removing the server from this selector : {}", key);
-                    key.cancel();
-                }
-            }
-        }
-
-        /**
-         * Handles the servers addition
-         */
-        private void processServerAdd() throws IOException {
-            while (!NioSelectorProcessor.this.serversToAdd.isEmpty()) {
-                Object[] tmp = NioSelectorProcessor.this.serversToAdd.poll();
-                ServerSocketChannel channel = (ServerSocketChannel) tmp[0];
-                SelectionKey key = channel.register(NioSelectorProcessor.this.selector, SelectionKey.OP_ACCEPT);
-                key.attach(tmp);
-                LOGGER.debug("Accepted the server on this selector : {}", key);
-            }
-        }
-
-        /**
-         * Handles all the sessions that must be connected
-         */
-        private void processConnectSessions() throws IOException {
-            while (!NioSelectorProcessor.this.sessionsToConnect.isEmpty()) {
-                NioTcpSession session = NioSelectorProcessor.this.sessionsToConnect.poll();
-                SelectionKey key = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
-                        SelectionKey.OP_READ);
-                key.attach(session);
-                this.sessionReadKey.put(session, key);
-
-                // Switch to CONNECTED, only if the session is not secured, as the SSL Handshake
-                // will occur later.
-                if (!session.isSecured()) {
-                    session.setConnected();
-
-                    // fire the event
-                    ((AbstractIoService) session.getService()).fireSessionCreated(session);
-                    session.processSessionOpened();
-                    long time = System.currentTimeMillis();
-                    NioSelectorProcessor.this.idleChecker.sessionRead(session, time);
-                    NioSelectorProcessor.this.idleChecker.sessionWritten(session, time);
-                }
-            }
-        }
-
-        /**
-         * Handles all the sessions that must be closed
-         */
-        private void processCloseSessions() throws IOException {
-            while (!NioSelectorProcessor.this.sessionsToClose.isEmpty()) {
-                NioTcpSession session = NioSelectorProcessor.this.sessionsToClose.poll();
-
-                SelectionKey key = this.sessionReadKey.remove(session);
-                key.cancel();
-
-                // closing underlying socket
-                session.getSocketChannel().close();
-                // fire the event
-                session.processSessionClosed();
-                ((AbstractIoService) session.getService()).fireSessionDestroyed(session);
-            }
-        }
-
-        /**
-         * Processes the Accept action for the given SelectionKey
-         */
-        private void processAccept(final SelectionKey key) throws IOException {
-            LOGGER.debug("acceptable new client {}", key);
-            ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key.attachment())[0];
-            IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
-            // accepted connection
-            SocketChannel newClientChannel = serverSocket.accept();
-            LOGGER.debug("client accepted");
-            // and give it's to the strategy
-            NioSelectorProcessor.this.strategy.getSelectorForNewSession(NioSelectorProcessor.this).createSession(
-                    server, newClientChannel);
-        }
-
-        /**
-         * Processes the Read action for the given SelectionKey
-         */
-        private void processRead(final SelectionKey key) throws IOException {
-            LOGGER.debug("readable client {}", key);
-            NioTcpSession session = (NioTcpSession) key.attachment();
-            SocketChannel channel = session.getSocketChannel();
-            NioSelectorProcessor.this.readBuffer.clear();
-            int readCount = channel.read(NioSelectorProcessor.this.readBuffer);
-
-            LOGGER.debug("read {} bytes", readCount);
-
-            if (readCount < 0) {
-                // session closed by the remote peer
-                LOGGER.debug("session closed by the remote peer");
-                NioSelectorProcessor.this.sessionsToClose.add(session);
-            } else {
-                // we have read some data
-                // limit at the current position & rewind buffer back to start & push to the chain
-                NioSelectorProcessor.this.readBuffer.flip();
-
-                if (session.isSecured()) {
-                    // We are reading data over a SSL/TLS encrypted connection. Redirect
-                    // the processing to the SslHelper class.
-                    SslHelper sslHelper = session.getAttribute(SSL_HELPER, null);
-
-                    if (sslHelper == null) {
-                        throw new IllegalStateException();
-                    }
-
-                    sslHelper.processRead(session, NioSelectorProcessor.this.readBuffer);
-                } else {
-                    // Plain message, not encrypted : go directly to the chain
-                    session.processMessageReceived(NioSelectorProcessor.this.readBuffer);
-                }
-
-                NioSelectorProcessor.this.idleChecker.sessionRead(session, System.currentTimeMillis());
-            }
-        }
-
-        /**
-         * Processes the Write action for the given SelectionKey
-         */
-        private void processWrite(final SelectionKey key) throws IOException {
-            NioTcpSession session = (NioTcpSession) key.attachment();
-
-            LOGGER.debug("writable session : {}", session);
-
-            session.setNotRegisteredForWrite();
-
-            // write from the session write queue
-            boolean isEmpty = false;
-
-            try {
-                Queue<WriteRequest> queue = session.acquireWriteQueue();
-
-                do {
-                    // get a write request from the queue
-                    WriteRequest wreq = queue.peek();
-
-                    if (wreq == null) {
-                        break;
-                    }
-
-                    ByteBuffer buf = (ByteBuffer) wreq.getMessage();
-
-                    // Note that if the connection is secured, the buffer already
-                    // contains encrypted data.
-                    int wrote = session.getSocketChannel().write(buf);
-                    session.incrementWrittenBytes(wrote);
-                    LOGGER.debug("wrote {} bytes to {}", wrote, session);
-
-                    NioSelectorProcessor.this.idleChecker.sessionWritten(session, System.currentTimeMillis());
-
-                    if (buf.remaining() == 0) {
-                        // completed write request, let's remove it
-                        queue.remove();
-                        // complete the future
-                        DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
-
-                        if (future != null) {
-                            future.complete();
-                        }
-                    } else {
-                        // output socket buffer is full, we need
-                        // to give up until next selection for
-                        // writing
-                        break;
-                    }
-                } while (!queue.isEmpty());
-
-                isEmpty = queue.isEmpty();
-            } finally {
-                session.releaseWriteQueue();
-            }
-
-            // if the session is no more interested in writing, we need
-            // to stop listening for OP_WRITE events
-            if (isEmpty) {
-                if (session.isClosing()) {
-                    LOGGER.debug("closing session {} have empty write queue, so we close it", session);
-                    // we was flushing writes, now we to the close
-                    session.getSocketChannel().close();
-                } else {
-                    // a key registered for read ? (because we can have a
-                    // Selector for reads and another for the writes
-                    SelectionKey readKey = this.sessionReadKey.get(session);
-
-                    if (readKey != null) {
-                        LOGGER.debug("registering key for only reading");
-                        SelectionKey mykey = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
-                                SelectionKey.OP_READ, session);
-                        this.sessionReadKey.put(session, mykey);
-                    } else {
-                        LOGGER.debug("cancel key for writing");
-                        session.getSocketChannel().keyFor(NioSelectorProcessor.this.selector).cancel();
-                    }
-                }
-            }
-        }
-
-        /**
-         * Flushes the sessions
-         */
-        private void processFlushSessions() throws IOException {
-            NioTcpSession session = NioSelectorProcessor.this.flushingSessions.poll();
-            // a key registered for read ? (because we can have a
-            // Selector for reads and another for the writes
-            SelectionKey readKey = this.sessionReadKey.get(session);
-
-            if (readKey != null) {
-                // register for read/write
-                SelectionKey key = session.getSocketChannel().register(NioSelectorProcessor.this.selector,
-                        SelectionKey.OP_READ | SelectionKey.OP_WRITE, session);
-
-                this.sessionReadKey.put(session, key);
-            } else {
-                session.getSocketChannel().register(NioSelectorProcessor.this.selector, SelectionKey.OP_WRITE, session);
-            }
-        }
-    }
+	/** A logger for this class */
+	private static final Logger LOGGER = LoggerFactory
+			.getLogger(NioSelectorProcessor.class);
+
+	/**
+	 * A timeout used for the select, as we need to get out to deal with idle
+	 * sessions
+	 */
+	private static final long SELECT_TIMEOUT = 1000L;
+
+	private SelectorStrategy<NioSelectorProcessor> strategy;
+
+	private final Map<SocketAddress, ServerSocketChannel> serverSocketChannels = new ConcurrentHashMap<SocketAddress, ServerSocketChannel>();
+
+	private final Map<SocketAddress, DatagramChannel> datagramChannels = new ConcurrentHashMap<SocketAddress, DatagramChannel>();
+
+	/** Read buffer for all the incoming bytes (default to 64Kb) */
+	private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
+
+	/** the thread polling and processing the I/O events */
+	private SelectorWorker worker = null;
+
+	/** helper for detecting idleing sessions */
+	private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+	/** A queue containing the servers to bind to this selector */
+	private final Queue<Object[]> serversToAdd = new ConcurrentLinkedQueue<Object[]>();
+
+	/** server to remove of the selector */
+	private final Queue<ServerSocketChannel> serversToRemove = new ConcurrentLinkedQueue<ServerSocketChannel>();
+
+	/**
+	 * new session freshly accepted, placed here for being added to the selector
+	 */
+	private final Queue<NioTcpSession> sessionsToConnect = new ConcurrentLinkedQueue<NioTcpSession>();
+
+	/** session to be removed of the selector */
+	private final Queue<NioTcpSession> sessionsToClose = new ConcurrentLinkedQueue<NioTcpSession>();
+
+	/** A queue used to store the sessions to be flushed */
+	private final Queue<NioTcpSession> flushingSessions = new ConcurrentLinkedQueue<NioTcpSession>();
+
+	private Selector selector;
+
+	// Lock for Selector worker, using default. can look into fairness later.
+	// We need to think about a lock less mechanism here.
+	private final Lock workerLock = new ReentrantLock();
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@SuppressWarnings("unchecked")
+	@Override
+	public void setStrategy(SelectorStrategy<?> strategy) {
+		this.strategy = (SelectorStrategy<NioSelectorProcessor>)strategy;
+	}
+
+	/**
+	 * Add a bound server channel for starting accepting new client connections.
+	 * 
+	 * @param serverChannel
+	 */
+	private void add(final ServerSocketChannel serverChannel,
+			final IoServer server) {
+		LOGGER.debug("adding a server channel {} for server {}", serverChannel,
+				server);
+		this.serversToAdd.add(new Object[] { serverChannel, server });
+		this.wakeupWorker();
+	}
+
+	/**
+	 * Wake the I/O worker thread and if none exists, create a new one FIXME :
+	 * too much locking there ?
+	 */
+	private void wakeupWorker() {
+		this.workerLock.lock();
+		try {
+			if (this.worker == null) {
+				this.worker = new SelectorWorker();
+				this.worker.start();
+			}
+		} finally {
+			this.workerLock.unlock();
+		}
+
+		if (this.selector != null) {
+			this.selector.wakeup();
+		}
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void bindTcpServer(final AbstractTcpServer server,
+			final SocketAddress address) throws IOException {
+		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+		serverSocketChannel.socket().setReuseAddress(server.isReuseAddress());
+		serverSocketChannel.socket().bind(address);
+		serverSocketChannel.configureBlocking(false);
+		this.serverSocketChannels.put(address, serverSocketChannel);
+		this.add(serverSocketChannel, server);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void bindUdpServer(AbstractUdpServer server, SocketAddress address)
+			throws IOException {
+		DatagramChannel datagramChannel = DatagramChannel.open();
+		datagramChannel.socket().setReuseAddress(server.isReuseAddress());
+		datagramChannel.socket().bind(address);
+		datagramChannel.configureBlocking(false);
+		this.datagramChannels.put(address, datagramChannel);
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void unbind(final SocketAddress address) throws IOException {
+		ServerSocketChannel channel = this.serverSocketChannels.get(address);
+		channel.socket().close();
+		channel.close();
+		if (this.serverSocketChannels.remove(address) == null) {
+			LOGGER.warn(
+					"The server channel for address {} was already unbound",
+					address);
+		}
+		LOGGER.debug("Removing a server channel {}", channel);
+		this.serversToRemove.add(channel);
+		this.wakeupWorker();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void createSession(final IoService service, final Object clientSocket)
+			throws SSLException {
+		LOGGER.debug("create session");
+		final SocketChannel socketChannel = (SocketChannel) clientSocket;
+		final TcpSessionConfig config = (TcpSessionConfig) service
+				.getSessionConfig();
+		final NioTcpSession session = new NioTcpSession(service, socketChannel,
+				this.strategy.getSelectorForNewSession(this));
+
+		try {
+			socketChannel.configureBlocking(false);
+		} catch (IOException e) {
+			LOGGER.error(
+					"Unexpected exception, while configuring socket as non blocking",
+					e);
+			throw new RuntimeIoException(
+					"cannot configure socket as non-blocking", e);
+		}
+		// apply idle configuration
+		session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE,
+				config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+		session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+				config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
+
+		// apply the default service socket configuration
+		Boolean keepAlive = config.isKeepAlive();
+
+		if (keepAlive != null) {
+			session.getConfig().setKeepAlive(keepAlive);
+		}
+
+		Boolean oobInline = config.isOobInline();
+
+		if (oobInline != null) {
+			session.getConfig().setOobInline(oobInline);
+		}
+
+		Boolean reuseAddress = config.isReuseAddress();
+
+		if (reuseAddress != null) {
+			session.getConfig().setReuseAddress(reuseAddress);
+		}
+
+		Boolean tcpNoDelay = config.isTcpNoDelay();
+
+		if (tcpNoDelay != null) {
+			session.getConfig().setTcpNoDelay(tcpNoDelay);
+		}
+
+		Integer receiveBufferSize = config.getReceiveBufferSize();
+
+		if (receiveBufferSize != null) {
+			session.getConfig().setReceiveBufferSize(receiveBufferSize);
+		}
+
+		Integer sendBufferSize = config.getSendBufferSize();
+
+		if (sendBufferSize != null) {
+			session.getConfig().setSendBufferSize(sendBufferSize);
+		}
+
+		Integer trafficClass = config.getTrafficClass();
+
+		if (trafficClass != null) {
+			session.getConfig().setTrafficClass(trafficClass);
+		}
+
+		Integer soLinger = config.getSoLinger();
+
+		if (soLinger != null) {
+			session.getConfig().setSoLinger(soLinger);
+		}
+
+		// Set the secured flag if the service is to be used over SSL/TLS
+		if (config.isSecured()) {
+			session.initSecure(config.getSslContext());
+		}
+
+		// event session created
+		session.processSessionCreated();
+
+		// add the session to the queue for being added to the selector
+		this.sessionsToConnect.add(session);
+		this.wakeupWorker();
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void flush(final AbstractIoSession session) {
+		LOGGER.debug("scheduling session {} for writing", session);
+		// add the session to the list of session to be registered for writing
+		this.flushingSessions.add((NioTcpSession) session);
+		// wake the selector for unlocking the I/O thread
+		this.wakeupWorker();
+	}
+
+	/**
+	 * The worker processing incoming session creation, session destruction
+	 * requests, session write and reads. It will also bind new servers.
+	 */
+	private class SelectorWorker extends Thread {
+		// map for finding the keys associated with a given server
+		private final Map<ServerSocketChannel, SelectionKey> serverKey = new HashMap<ServerSocketChannel, SelectionKey>();
+
+		// map for finding read keys associated with a given session
+		private final Map<NioTcpSession, SelectionKey> sessionReadKey = new HashMap<NioTcpSession, SelectionKey>();
+
+		@Override
+		public void run() {
+			try {
+				if (NioSelectorProcessor.this.selector == null) {
+					LOGGER.debug("opening a new selector");
+
+					try {
+						NioSelectorProcessor.this.selector = Selector.open();
+					} catch (IOException e) {
+						LOGGER.error(
+								"IOException while opening a new Selector", e);
+					}
+				}
+
+				for (;;) {
+					try {
+						// pop server sockets for removing
+						if (NioSelectorProcessor.this.serversToRemove.size() > 0) {
+							this.processServerRemove();
+						}
+
+						// pop new server sockets for accepting
+						if (NioSelectorProcessor.this.serversToAdd.size() > 0) {
+							this.processServerAdd();
+						}
+
+						// pop new session for starting read/write
+						if (NioSelectorProcessor.this.sessionsToConnect.size() > 0) {
+							this.processConnectSessions();
+						}
+
+						// pop session for close, if any
+						if (NioSelectorProcessor.this.sessionsToClose.size() > 0) {
+							this.processCloseSessions();
+						}
+
+						LOGGER.debug("selecting...");
+						int readyCount = NioSelectorProcessor.this.selector
+								.select(SELECT_TIMEOUT);
+						LOGGER.debug("... done selecting : {}", readyCount);
+
+						if (readyCount > 0) {
+							// process selected keys
+							Iterator<SelectionKey> selectedKeys = NioSelectorProcessor.this.selector
+									.selectedKeys().iterator();
+
+							// Loop on each SelectionKey and process any valid
+							// action
+							while (selectedKeys.hasNext()) {
+								SelectionKey key = selectedKeys.next();
+								selectedKeys.remove();
+
+								if (!key.isValid()) {
+									continue;
+								}
+
+								NioSelectorProcessor.this.selector
+										.selectedKeys().remove(key);
+
+								if (key.isAcceptable()) {
+									this.processAccept(key);
+								}
+
+								if (key.isReadable()) {
+									this.processRead(key);
+								}
+
+								if (key.isWritable()) {
+									this.processWrite(key);
+								}
+
+							}
+						}
+
+						// registering session with data in the write queue for
+						// writing
+						while (!NioSelectorProcessor.this.flushingSessions
+								.isEmpty()) {
+							this.processFlushSessions();
+						}
+					} catch (IOException e) {
+						LOGGER.error("IOException while selecting selector", e);
+					}
+
+					// stop the worker if needed
+					NioSelectorProcessor.this.workerLock.lock();
+
+					try {
+						if (NioSelectorProcessor.this.selector.keys().isEmpty()) {
+							NioSelectorProcessor.this.worker = null;
+							break;
+						}
+					} finally {
+						NioSelectorProcessor.this.workerLock.unlock();
+					}
+
+					// check for idle events
+					NioSelectorProcessor.this.idleChecker
+							.processIdleSession(System.currentTimeMillis());
+				}
+			} catch (Exception e) {
+				LOGGER.error("Unexpected exception : ", e);
+			}
+		}
+
+		/**
+		 * Handles the servers removal
+		 */
+		private void processServerRemove() {
+			while (!NioSelectorProcessor.this.serversToRemove.isEmpty()) {
+				ServerSocketChannel channel = NioSelectorProcessor.this.serversToRemove
+						.poll();
+				SelectionKey key = this.serverKey.remove(channel);
+
+				if (key == null) {
+					LOGGER.error("The server socket was already removed of the selector");
+				} else {
+					LOGGER.debug("Removing the server from this selector : {}",
+							key);
+					key.cancel();
+				}
+			}
+		}
+
+		/**
+		 * Handles the servers addition
+		 */
+		private void processServerAdd() throws IOException {
+			while (!NioSelectorProcessor.this.serversToAdd.isEmpty()) {
+				Object[] tmp = NioSelectorProcessor.this.serversToAdd.poll();
+				ServerSocketChannel channel = (ServerSocketChannel) tmp[0];
+				SelectionKey key = channel.register(
+						NioSelectorProcessor.this.selector,
+						SelectionKey.OP_ACCEPT);
+				key.attach(tmp);
+				LOGGER.debug("Accepted the server on this selector : {}", key);
+			}
+		}
+
+		/**
+		 * Handles all the sessions that must be connected
+		 */
+		private void processConnectSessions() throws IOException {
+			while (!NioSelectorProcessor.this.sessionsToConnect.isEmpty()) {
+				NioTcpSession session = NioSelectorProcessor.this.sessionsToConnect
+						.poll();
+				SelectionKey key = session.getSocketChannel().register(
+						NioSelectorProcessor.this.selector,
+						SelectionKey.OP_READ);
+				key.attach(session);
+				this.sessionReadKey.put(session, key);
+
+				// Switch to CONNECTED, only if the session is not secured, as
+				// the SSL Handshake
+				// will occur later.
+				if (!session.isSecured()) {
+					session.setConnected();
+
+					// fire the event
+					((AbstractIoService) session.getService())
+							.fireSessionCreated(session);
+					session.processSessionOpened();
+					long time = System.currentTimeMillis();
+					NioSelectorProcessor.this.idleChecker.sessionRead(session,
+							time);
+					NioSelectorProcessor.this.idleChecker.sessionWritten(
+							session, time);
+				}
+			}
+		}
+
+		/**
+		 * Handles all the sessions that must be closed
+		 */
+		private void processCloseSessions() throws IOException {
+			while (!NioSelectorProcessor.this.sessionsToClose.isEmpty()) {
+				NioTcpSession session = NioSelectorProcessor.this.sessionsToClose
+						.poll();
+
+				SelectionKey key = this.sessionReadKey.remove(session);
+				key.cancel();
+
+				// closing underlying socket
+				session.getSocketChannel().close();
+				// fire the event
+				session.processSessionClosed();
+				((AbstractIoService) session.getService())
+						.fireSessionDestroyed(session);
+			}
+		}
+
+		/**
+		 * Processes the Accept action for the given SelectionKey
+		 */
+		private void processAccept(final SelectionKey key) throws IOException {
+			LOGGER.debug("acceptable new client {}", key);
+			ServerSocketChannel serverSocket = (ServerSocketChannel) ((Object[]) key
+					.attachment())[0];
+			IoServer server = (IoServer) (((Object[]) key.attachment())[1]);
+			// accepted connection
+			SocketChannel newClientChannel = serverSocket.accept();
+			LOGGER.debug("client accepted");
+			// and give it's to the strategy
+			NioSelectorProcessor.this.strategy.getSelectorForNewSession(
+					NioSelectorProcessor.this).createSession(server,
+					newClientChannel);
+		}
+
+		/**
+		 * Processes the Read action for the given SelectionKey
+		 */
+		private void processRead(final SelectionKey key) throws IOException {
+			LOGGER.debug("readable client {}", key);
+			NioTcpSession session = (NioTcpSession) key.attachment();
+			SocketChannel channel = session.getSocketChannel();
+			NioSelectorProcessor.this.readBuffer.clear();
+			int readCount = channel.read(NioSelectorProcessor.this.readBuffer);
+
+			LOGGER.debug("read {} bytes", readCount);
+
+			if (readCount < 0) {
+				// session closed by the remote peer
+				LOGGER.debug("session closed by the remote peer");
+				NioSelectorProcessor.this.sessionsToClose.add(session);
+			} else {
+				// we have read some data
+				// limit at the current position & rewind buffer back to start &
+				// push to the chain
+				NioSelectorProcessor.this.readBuffer.flip();
+
+				if (session.isSecured()) {
+					// We are reading data over a SSL/TLS encrypted connection.
+					// Redirect
+					// the processing to the SslHelper class.
+					SslHelper sslHelper = session
+							.getAttribute(SSL_HELPER, null);
+
+					if (sslHelper == null) {
+						throw new IllegalStateException();
+					}
+
+					sslHelper.processRead(session,
+							NioSelectorProcessor.this.readBuffer);
+				} else {
+					// Plain message, not encrypted : go directly to the chain
+					session.processMessageReceived(NioSelectorProcessor.this.readBuffer);
+				}
+
+				NioSelectorProcessor.this.idleChecker.sessionRead(session,
+						System.currentTimeMillis());
+			}
+		}
+
+		/**
+		 * Processes the Write action for the given SelectionKey
+		 */
+		private void processWrite(final SelectionKey key) throws IOException {
+			NioTcpSession session = (NioTcpSession) key.attachment();
+
+			LOGGER.debug("writable session : {}", session);
+
+			session.setNotRegisteredForWrite();
+
+			// write from the session write queue
+			boolean isEmpty = false;
+
+			try {
+				Queue<WriteRequest> queue = session.acquireWriteQueue();
+
+				do {
+					// get a write request from the queue
+					WriteRequest wreq = queue.peek();
+
+					if (wreq == null) {
+						break;
+					}
+
+					ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+
+					// Note that if the connection is secured, the buffer
+					// already
+					// contains encrypted data.
+					int wrote = session.getSocketChannel().write(buf);
+					session.incrementWrittenBytes(wrote);
+					LOGGER.debug("wrote {} bytes to {}", wrote, session);
+
+					NioSelectorProcessor.this.idleChecker.sessionWritten(
+							session, System.currentTimeMillis());
+
+					if (buf.remaining() == 0) {
+						// completed write request, let's remove it
+						queue.remove();
+						// complete the future
+						DefaultWriteFuture future = (DefaultWriteFuture) wreq
+								.getFuture();
+
+						if (future != null) {
+							future.complete();
+						}
+					} else {
+						// output socket buffer is full, we need
+						// to give up until next selection for
+						// writing
+						break;
+					}
+				} while (!queue.isEmpty());
+
+				isEmpty = queue.isEmpty();
+			} finally {
+				session.releaseWriteQueue();
+			}
+
+			// if the session is no more interested in writing, we need
+			// to stop listening for OP_WRITE events
+			if (isEmpty) {
+				if (session.isClosing()) {
+					LOGGER.debug(
+							"closing session {} have empty write queue, so we close it",
+							session);
+					// we was flushing writes, now we to the close
+					session.getSocketChannel().close();
+				} else {
+					// a key registered for read ? (because we can have a
+					// Selector for reads and another for the writes
+					SelectionKey readKey = this.sessionReadKey.get(session);
+
+					if (readKey != null) {
+						LOGGER.debug("registering key for only reading");
+						SelectionKey mykey = session.getSocketChannel()
+								.register(NioSelectorProcessor.this.selector,
+										SelectionKey.OP_READ, session);
+						this.sessionReadKey.put(session, mykey);
+					} else {
+						LOGGER.debug("cancel key for writing");
+						session.getSocketChannel()
+								.keyFor(NioSelectorProcessor.this.selector)
+								.cancel();
+					}
+				}
+			}
+		}
+
+		/**
+		 * Flushes the sessions
+		 */
+		private void processFlushSessions() throws IOException {
+			NioTcpSession session = NioSelectorProcessor.this.flushingSessions
+					.poll();
+			// a key registered for read ? (because we can have a
+			// Selector for reads and another for the writes
+			SelectionKey readKey = this.sessionReadKey.get(session);
+
+			if (readKey != null) {
+				// register for read/write
+				SelectionKey key = session.getSocketChannel().register(
+						NioSelectorProcessor.this.selector,
+						SelectionKey.OP_READ | SelectionKey.OP_WRITE, session);
+
+				this.sessionReadKey.put(session, key);
+			} else {
+				session.getSocketChannel().register(
+						NioSelectorProcessor.this.selector,
+						SelectionKey.OP_WRITE, session);
+			}
+		}
+	}
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/nio/NioTcpServer.java Tue May 22 15:02:28 2012
@@ -21,8 +21,9 @@ package org.apache.mina.transport.tcp.ni
 
 import java.io.IOException;
 import java.net.SocketAddress;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.apache.mina.service.SelectorStrategy;
@@ -42,16 +43,17 @@ public class NioTcpServer extends Abstra
     static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
 
     // list of bound addresses
-    private final Set<SocketAddress> addresses = Collections.synchronizedSet(new HashSet<SocketAddress>());
-
+    private final Map<SocketAddress /* bound address */,NioSelectorProcessor /* used processor */> addresses = new HashMap<SocketAddress, NioSelectorProcessor>();
+    
     // the strategy for dispatching servers and client to selector threads.
-    private final SelectorStrategy strategy;
+    private final SelectorStrategy<NioSelectorProcessor> strategy;
 
+    // the default session confinguration
     private TcpSessionConfig config;
 
     private boolean reuseAddress = false;
 
-    public NioTcpServer(final SelectorStrategy strategy) {
+    public NioTcpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
         super();
         this.strategy = strategy;
         this.config = new DefaultTcpSessionConfig();
@@ -89,24 +91,26 @@ public class NioTcpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public void bind(final SocketAddress... localAddress) throws IOException {
+    public synchronized void bind(final SocketAddress... localAddress) throws IOException {
         if (localAddress == null) {
             // We should at least have one address to bind on
-            throw new IllegalStateException("LocalAdress cannot be null");
+            throw new IllegalArgumentException("LocalAdress cannot be null");
         }
 
         for (SocketAddress address : localAddress) {
             // check if the address is already bound
             synchronized (this) {
-                if (this.addresses.contains(address)) {
+                if (this.addresses.containsKey(address)) {
                     throw new IOException("address " + address + " already bound");
                 }
 
-                LOG.debug("binding address {}", address);
-
-                this.addresses.add(address);
-                NioSelectorProcessor processor = (NioSelectorProcessor) this.strategy.getSelectorForBindNewAddress();
-                processor.bindAndAcceptAddress(this, address);
+                LOG.info("binding address {}", address);
+                NioSelectorProcessor processor = this.strategy.getSelectorForBindNewAddress();
+                
+                this.addresses.put(address,processor);
+                
+                processor.bindTcpServer(this, address);
+                
                 if (this.addresses.size() == 1) {
                     // it's the first address bound, let's fire the event
                     this.fireServiceActivated();
@@ -119,23 +123,21 @@ public class NioTcpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public Set<SocketAddress> getLocalAddresses() {
-        return this.addresses;
+    public synchronized Set<SocketAddress> getLocalAddresses() {
+        return new HashSet<SocketAddress>(addresses.keySet());
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void unbind(final SocketAddress... localAddresses) throws IOException {
+    public synchronized void unbind(final SocketAddress... localAddresses) throws IOException {
         for (SocketAddress socketAddress : localAddresses) {
-            LOG.debug("unbinding {}", socketAddress);
-            synchronized (this) {
-                this.strategy.unbind(socketAddress);
-                this.addresses.remove(socketAddress);
-                if (this.addresses.isEmpty()) {
-                    this.fireServiceInactivated();
-                }
+            LOG.info("unbinding {}", socketAddress);
+            addresses.get(socketAddress).unbind(socketAddress);
+            this.addresses.remove(socketAddress);
+            if (this.addresses.isEmpty()) {
+                this.fireServiceInactivated();
             }
         }
     }
@@ -144,10 +146,12 @@ public class NioTcpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public void unbindAll() throws IOException {
-        for (SocketAddress socketAddress : this.addresses) {
-            this.unbind(socketAddress);
-        }
+    public synchronized void unbindAll() throws IOException {
+    	LOG.info("unbinding all");
+    	for(SocketAddress address : addresses.keySet()) {
+			LOG.debug("unbinding {}", address);
+    		addresses.remove(address).unbind(address);
+    	}
     }
 
 }
\ No newline at end of file

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/udp/AbstractUdpServer.java Tue May 22 15:02:28 2012
@@ -19,10 +19,6 @@
  */
 package org.apache.mina.transport.udp;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.Set;
-
 import javax.net.ssl.SSLException;
 
 import org.apache.mina.api.IoSession;
@@ -41,34 +37,22 @@ public abstract class AbstractUdpServer 
         super();
     }
 
-    @Override
-    public Set<SocketAddress> getLocalAddresses() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public void bind(SocketAddress... localAddress) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void unbindAll() throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void unbind(SocketAddress... localAddresses) throws IOException {
-        // TODO Auto-generated method stub
-
-    }
-
     /**
      * {@inheritDoc}
      */
     public void initSecured(IoSession session) throws SSLException {
-        // Do nothing : UDP does not support SSL
+        throw new RuntimeException("SSL is not supported for UDP");
     }
+    
+    /**
+     * Set the reuse address flag on the server socket
+     * @param reuseAddress <code>true</code> to enable
+     */
+    public abstract void setReuseAddress(boolean reuseAddress);
+
+    /**
+     * Is the reuse address enabled for this server.
+     * @return
+     */
+    public abstract boolean isReuseAddress();
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/udp/nio/NioUdpServer.java Tue May 22 15:02:28 2012
@@ -19,6 +19,7 @@
  */
 package org.apache.mina.transport.udp.nio;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.util.Collections;
 import java.util.HashSet;
@@ -26,6 +27,7 @@ import java.util.Set;
 
 import org.apache.mina.api.IoSessionConfig;
 import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.transport.tcp.NioSelectorProcessor;
 import org.apache.mina.transport.udp.AbstractUdpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +46,8 @@ public class NioUdpServer extends Abstra
 
     // the strategy for dispatching servers and client to selector threads.
     private final SelectorStrategy strategy;
+    
+    private boolean reuseAddress = false;
 
     /**
      * Create a new instance of NioUdpServer
@@ -61,4 +65,77 @@ public class NioUdpServer extends Abstra
         // TODO Auto-generated method stub
         return null;
     }
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public Set<SocketAddress> getLocalAddresses() {
+		// TODO Auto-generated method stub
+		return null;
+	}
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void bind(final SocketAddress... localAddress) throws IOException {
+        if (localAddress == null) {
+            // We should at least have one address to bind on
+            throw new IllegalArgumentException("LocalAdress cannot be null");
+        }
+
+        for (SocketAddress address : localAddress) {
+            // check if the address is already bound
+            synchronized (this) {
+                if (this.addresses.contains(address)) {
+                    throw new IOException("address " + address + " already bound");
+                }
+
+                LOG.debug("binding address {}", address);
+
+                this.addresses.add(address);
+                NioSelectorProcessor processor = (NioSelectorProcessor) this.strategy.getSelectorForBindNewAddress();
+                processor.bindUdpServer(this, address);
+                if (this.addresses.size() == 1) {
+                    // it's the first address bound, let's fire the event
+                    this.fireServiceActivated();
+                }
+            }
+        }
+    }
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void unbindAll() throws IOException {
+		// TODO Auto-generated method stub
+		
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void unbind(SocketAddress... localAddresses) throws IOException {
+		// TODO Auto-generated method stub
+		
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public void setReuseAddress(boolean reuseAddress) {
+		this.reuseAddress = reuseAddress;		
+	}
+
+	/**
+	 * {@inheritDoc}
+	 */
+	@Override
+	public boolean isReuseAddress() {
+		return this.reuseAddress;
+	}
 }

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java Tue May 22 15:02:28 2012
@@ -34,7 +34,6 @@ import org.apache.mina.filter.logging.Lo
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
 import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
 import org.apache.mina.transport.tcp.NioSelectorProcessor;
 import org.apache.mina.transport.tcp.nio.NioTcpServer;
 import org.slf4j.Logger;
@@ -53,8 +52,8 @@ public class NioEchoServer {
     public static void main(String[] args) {
         LOG.info("starting echo server");
 
-        OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
-                NioSelectorProcessor.class));
+        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
+        
         NioTcpServer acceptor = new NioTcpServer(strategy);
 
         // create the fitler chain for this service

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java Tue May 22 15:02:28 2012
@@ -39,7 +39,6 @@ import org.apache.mina.http.api.HttpRequ
 import org.apache.mina.http.api.HttpStatus;
 import org.apache.mina.http.api.HttpVersion;
 import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
 import org.apache.mina.transport.tcp.NioSelectorProcessor;
 import org.apache.mina.transport.tcp.nio.NioTcpServer;
 import org.slf4j.Logger;
@@ -51,8 +50,8 @@ public class HttpTest {
 
     public static void main(String[] args) throws Exception {
 
-        OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
-                NioSelectorProcessor.class));
+        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
+
         NioTcpServer acceptor = new NioTcpServer(strategy);
         acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
                 new DummyHttpSever());

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java Tue May 22 15:02:28 2012
@@ -39,7 +39,6 @@ import org.apache.mina.http.api.HttpRequ
 import org.apache.mina.http.api.HttpStatus;
 import org.apache.mina.http.api.HttpVersion;
 import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
 import org.apache.mina.transport.tcp.NioSelectorProcessor;
 import org.apache.mina.transport.tcp.nio.NioTcpServer;
 import org.slf4j.Logger;
@@ -51,8 +50,7 @@ public class HttpsTest {
 
     public static void main(String[] args) throws Exception {
 
-        OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
-                NioSelectorProcessor.class));
+        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
         NioTcpServer acceptor = new NioTcpServer(strategy);
 
         acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java?rev=1341509&r1=1341508&r2=1341509&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java Tue May 22 15:02:28 2012
@@ -40,7 +40,6 @@ import org.apache.mina.filter.logging.Lo
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.ldap.LdapCodec;
 import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.service.SelectorFactory;
 import org.apache.mina.transport.tcp.NioSelectorProcessor;
 import org.apache.mina.transport.tcp.nio.NioTcpServer;
 import org.slf4j.Logger;
@@ -54,8 +53,7 @@ public class LdapTest {
 
     public static void main(String[] args) throws Exception {
         LdapTest ldapServer = new LdapTest();
-        OneThreadSelectorStrategy strategy = new OneThreadSelectorStrategy(new SelectorFactory(
-                NioSelectorProcessor.class));
+        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
         NioTcpServer acceptor = new NioTcpServer(strategy);
         acceptor.setFilters(new LoggingFilter("INCOMING"), new LdapCodec(), new LoggingFilter("DECODED"),
                 ldapServer.new DummyLdapSever());
@@ -113,7 +111,7 @@ public class LdapTest {
          */
         private void handle(IoSession session, BindRequest bindRequest) {
             // Build a faked BindResponse
-            BindResponse response = (BindResponse) bindRequest.getResultResponse();
+            BindResponse response = bindRequest.getResultResponse();
             response.getLdapResult().setResultCode(ResultCodeEnum.SUCCESS);
 
             session.write(response);