You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2012/09/30 17:35:06 UTC

svn commit: r1392037 - in /mina/trunk: core/src/main/java/org/apache/mina/session/ core/src/main/java/org/apache/mina/transport/nio/ core/src/test/java/org/apache/mina/service/idlecheker/ core/src/test/java/org/apache/mina/session/ examples/src/main/ja...

Author: jvermillard
Date: Sun Sep 30 15:35:05 2012
New Revision: 1392037

URL: http://svn.apache.org/viewvc?rev=1392037&view=rev
Log:
major rework of NioSelector

Added:
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java   (with props)
Removed:
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/AbstractNioSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorEventListener.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
    mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
    mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
    mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java Sun Sep 30 15:35:05 2012
@@ -37,10 +37,12 @@ import org.apache.mina.api.IoFilter;
 import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSession;
+import org.apache.mina.api.RuntimeIoException;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
 import org.apache.mina.service.SelectorProcessor;
 import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.util.AbstractIoFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +70,6 @@ public abstract class AbstractIoSession 
     /** attributes map */
     private final AttributeContainer attributes = new DefaultAttributeContainer();
 
-    /** The {@link SelectorProcessor} used for handling this session writing */
-    protected SelectorProcessor writeProcessor;
-
     /** the {@link IdleChecker} in charge of detecting idle event for this session */
     protected final IdleChecker idleChecker;
 
@@ -149,14 +148,13 @@ public abstract class AbstractIoSession 
      * {@link org.apache.mina.api.IoSession#getId()}) and an associated {@link IoService}
      * 
      * @param service the service this session is associated with
-     * @param writeProcessor the processor in charge of processing this session write queue
+     * @param selectorLoop the selector loop in charge of processing this session read/write events
      */
-    public AbstractIoSession(IoService service, SelectorProcessor writeProcessor, IdleChecker idleChecker) {
+    public AbstractIoSession(IoService service, IdleChecker idleChecker) {
         // generated a unique id
         id = NEXT_ID.getAndIncrement();
         creationTime = System.currentTimeMillis();
         this.service = service;
-        this.writeProcessor = writeProcessor;
         this.chain = service.getFilters();
         this.idleChecker = idleChecker;
 
@@ -498,6 +496,10 @@ public abstract class AbstractIoSession 
         return attributes.removeAttribute(key);
     }
 
+    //----------------------------------------------------
+    // Write management
+    //----------------------------------------------------
+
     /**
      * {@inheritDoc}
      */
@@ -561,12 +563,14 @@ public abstract class AbstractIoSession 
         // If it wasn't, we register this session as interested to write.
         // It's done in atomic fashion for avoiding two concurrent registering.
         if (!registeredForWrite.getAndSet(true)) {
-            writeProcessor.flush(this);
+            flushWriteQueue();
         }
 
         return request;
     }
 
+    public abstract void flushWriteQueue();
+
     public void setNotRegisteredForWrite() {
         registeredForWrite.set(false);
     }
@@ -589,6 +593,60 @@ public abstract class AbstractIoSession 
     }
 
     //------------------------------------------------------------------------
+    // Close session management
+    //------------------------------------------------------------------------
+
+    /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
+    private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
+            // we don't cancel close
+            return false;
+        }
+    };
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public IoFuture<Void> close(boolean immediately) {
+        switch (state) {
+        case CREATED:
+            LOG.error("Session {} not opened", this);
+            throw new RuntimeIoException("cannot close an not opened session");
+        case CONNECTED:
+            state = SessionState.CLOSING;
+            if (immediately) {
+                channelClose();
+            } else {
+                // flush this session the flushing code will close the session
+                flushWriteQueue();
+            }
+            break;
+        case CLOSING:
+            // return the same future
+            LOG.warn("Already closing session {}", this);
+            break;
+        case CLOSED:
+            LOG.warn("Already closed session {}", this);
+            break;
+        default:
+            throw new RuntimeIoException("not implemented session state : " + state);
+        }
+
+        return closeFuture;
+    }
+
+    /**
+     * Close the inner socket channel
+     */
+    protected abstract void channelClose();
+
+    //------------------------------------------------------------------------
     // Event processing using the filter chain
     //------------------------------------------------------------------------
 

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+import org.apache.mina.api.RuntimeIoException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public class NioSelectorLoop implements SelectorLoop {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NioSelectorLoop.class);
+
+    /**
+     * A timeout used for the select, as we need to get out to deal with idle
+     * sessions
+     */
+    private static final long SELECT_TIMEOUT = 1000L;
+
+    /** the selector managed by this class */
+    private Selector selector;
+
+    /** the worker thread in charge of polling the selector */
+    private final SelectorWorker worker = new SelectorWorker();
+
+    /**  the number of service using this selector */
+    private int serviceCount = 0;
+
+    /** Read buffer for all the incoming bytes (default to 64Kb) */
+    private final ByteBuffer readBuffer = ByteBuffer.allocate(64 * 1024);
+
+    public NioSelectorLoop() {
+        try {
+            selector = Selector.open();
+        } catch (IOException ioe) {
+            LOGGER.error("Impossible to open a new NIO selector, O/S is out of file descriptor ?");
+            throw new RuntimeIoException(ioe);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void register(final boolean accept, final boolean read, final boolean write,
+            final SelectorListener listener, SelectableChannel channel) {
+        LOGGER.debug("adding to registration queue : {} for accept : {}, read : {}, write : {}", new Object[] {
+                listener, accept, read, write });
+        int ops = 0;
+        if (accept) {
+            ops |= SelectionKey.OP_ACCEPT;
+        }
+        if (read) {
+            ops |= SelectionKey.OP_READ;
+        }
+        if (write) {
+            ops |= SelectionKey.OP_WRITE;
+        }
+        try {
+            channel.register(selector, ops, listener);
+        } catch (ClosedChannelException e) {
+            LOGGER.error("Trying to register an already closed channel : ", e);
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void modifyRegistration(final boolean accept, final boolean read, final boolean write,
+            final SelectorListener listener, SelectableChannel channel) {
+        LOGGER.debug("modifying registration : {} for accept : {}, read : {}, write : {}", new Object[] { listener,
+                accept, read, write });
+
+        SelectionKey key = channel.keyFor(selector);
+        if (key == null) {
+            LOGGER.error("Trying to modify the registration of a not registered channel");
+            return;
+        }
+
+        int ops = 0;
+        if (accept) {
+            ops |= SelectionKey.OP_ACCEPT;
+        }
+        if (read) {
+            ops |= SelectionKey.OP_READ;
+        }
+        if (write) {
+            ops |= SelectionKey.OP_WRITE;
+        }
+        key.interestOps(ops);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void unregister(final SelectorListener listener, SelectableChannel channel) {
+        LOGGER.debug("unregistering : {}", listener);
+        SelectionKey key = channel.keyFor(selector);
+        if (key == null) {
+            LOGGER.error("Trying to modify the registration of a not registered channel");
+            return;
+        }
+        key.cancel();
+        key.attach(null);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void incrementServiceCount() {
+        serviceCount++;
+        LOGGER.debug("service count: {}", serviceCount);
+        if (serviceCount == 1) {
+            worker.start();
+        }
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public synchronized void decrementServiceCount() {
+        serviceCount--;
+        LOGGER.debug("service count: {}", serviceCount);
+        if (serviceCount < 0) {
+            LOGGER.error("service count should not be negative : bug ?");
+        }
+    }
+
+    /**
+     * The worker processing incoming session creation, session destruction
+     * requests, session write and reads. It will also bind new servers.
+     */
+    private class SelectorWorker extends Thread {
+
+        @Override
+        public void run() {
+            if (selector == null) {
+                LOGGER.debug("opening a new selector");
+
+                try {
+                    selector = Selector.open();
+                } catch (IOException e) {
+                    LOGGER.error("IOException while opening a new Selector", e);
+                }
+            }
+
+            for (;;) {
+                try {
+                    LOGGER.debug("selecting...");
+                    int readyCount = selector.select(SELECT_TIMEOUT);
+                    LOGGER.debug("... done selecting : {}", readyCount);
+                    if (readyCount > 0) {
+                        for (SelectionKey key : selector.selectedKeys()) {
+                            SelectorListener listener = (SelectorListener) key.attachment();
+                            listener.ready(key.isAcceptable(), key.isReadable(), key.isReadable() ? readBuffer : null,
+                                    key.isWritable());
+                        }
+                    }
+
+                } catch (Exception e) {
+                    LOGGER.error("Unexpected exception : ", e);
+                }
+
+                // stop the worker if needed (no more service)
+                synchronized (NioSelectorLoop.this) {
+                    LOGGER.debug("remaing {} service", serviceCount);
+                    if (serviceCount <= 0) {
+                        LOGGER.debug("stop the worker");
+                        break;
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioSelectorLoop.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpServer.java Sun Sep 30 15:35:05 2012
@@ -26,8 +26,11 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
-import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.api.IdleStatus;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.tcp.AbstractTcpServer;
+import org.apache.mina.transport.tcp.TcpSessionConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,16 +39,15 @@ import org.slf4j.LoggerFactory;
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public class NioTcpServer extends AbstractTcpServer implements SelectorEventListener {
+public class NioTcpServer extends AbstractTcpServer implements SelectorListener {
     static final Logger LOG = LoggerFactory.getLogger(NioTcpServer.class);
 
-    // the strategy for dispatching servers and client to selector threads.
-    private final SelectorStrategy<NioSelectorProcessor> strategy;
-
     // the bound local address
     private SocketAddress address = null;
 
-    private NioSelectorProcessor acceptProcessor = null;
+    private final SelectorLoop acceptSelectorLoop;
+
+    private final SelectorLoop readWriteSelectorLoop;
 
     // the key used for selecting accept event
     private SelectionKey acceptKey = null;
@@ -53,9 +55,12 @@ public class NioTcpServer extends Abstra
     // the server socket for accepting clients
     private ServerSocketChannel serverChannel = null;
 
-    public NioTcpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
+    private final IdleChecker idleChecker = new IndexedIdleChecker();
+
+    public NioTcpServer(final SelectorLoop acceptSelectorLoop, SelectorLoop readWriteSelectorLoop) {
         super();
-        this.strategy = strategy;
+        this.acceptSelectorLoop = acceptSelectorLoop;
+        this.readWriteSelectorLoop = readWriteSelectorLoop;
     }
 
     /**
@@ -93,12 +98,13 @@ public class NioTcpServer extends Abstra
         serverChannel.socket().bind(address);
         serverChannel.configureBlocking(false);
 
-        acceptProcessor = this.strategy.getSelectorForBindNewAddress();
-
-        acceptProcessor.addServer(this);
+        acceptSelectorLoop.register(true, false, false, this, serverChannel);
 
         // it's the first address bound, let's fire the event
         this.fireServiceActivated();
+
+        // will start the selector processor if we are the first service
+        acceptSelectorLoop.incrementServiceCount();
     }
 
     /**
@@ -120,10 +126,13 @@ public class NioTcpServer extends Abstra
         }
         serverChannel.socket().close();
         serverChannel.close();
-        acceptProcessor.removeServer(this);
+        acceptSelectorLoop.unregister(this, serverChannel);
 
         this.address = null;
         this.fireServiceInactivated();
+
+        // will stop the acceptor processor if we are the last service
+        acceptSelectorLoop.decrementServiceCount();
     }
 
     /**
@@ -144,30 +153,99 @@ public class NioTcpServer extends Abstra
      * {@inheritDoc}
      */
     @Override
-    public void acceptReady(NioSelectorProcessor processor) throws IOException {
-        LOG.debug("acceptable new client");
+    public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+        if (accept) {
+            LOG.debug("acceptable new client");
+
+            // accepted connection
+            try {
+                LOG.debug("new client accepted");
+                createSession(getServerSocketChannel().accept());
+
+            } catch (IOException e) {
+                LOG.error("error while accepting new client", e);
+            }
+        }
+        if (read || write) {
+            throw new IllegalStateException("should not receive read or write events");
+        }
+    }
 
-        // accepted connection
-        SocketChannel newClientChannel = getServerSocketChannel().accept();
-        LOG.debug("client accepted");
+    private void createSession(final SocketChannel clientSocket) throws IOException {
+        LOG.debug("create session");
+        final SocketChannel socketChannel = clientSocket;
+        final TcpSessionConfig config = getSessionConfig();
+        final NioTcpSession session = new NioTcpSession(this, socketChannel, readWriteSelectorLoop, idleChecker);
+
+        socketChannel.configureBlocking(false);
+
+        // apply idle configuration
+        session.getConfig().setIdleTimeInMillis(IdleStatus.READ_IDLE, config.getIdleTimeInMillis(IdleStatus.READ_IDLE));
+        session.getConfig().setIdleTimeInMillis(IdleStatus.WRITE_IDLE,
+                config.getIdleTimeInMillis(IdleStatus.WRITE_IDLE));
 
-        // and give it's to the strategy
-        strategy.getSelectorForNewSession(processor).createSession(this, newClientChannel);
-    }
+        // apply the default service socket configuration
+        Boolean keepAlive = config.isKeepAlive();
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) {
-        throw new IllegalStateException("read event should never occur on NioTcpServer");
-    }
+        if (keepAlive != null) {
+            session.getConfig().setKeepAlive(keepAlive);
+        }
 
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public void writeReady(NioSelectorProcessor processor) {
-        throw new IllegalStateException("write event should never occur on NioTcpServer");
+        Boolean oobInline = config.isOobInline();
+
+        if (oobInline != null) {
+            session.getConfig().setOobInline(oobInline);
+        }
+
+        Boolean reuseAddress = config.isReuseAddress();
+
+        if (reuseAddress != null) {
+            session.getConfig().setReuseAddress(reuseAddress);
+        }
+
+        Boolean tcpNoDelay = config.isTcpNoDelay();
+
+        if (tcpNoDelay != null) {
+            session.getConfig().setTcpNoDelay(tcpNoDelay);
+        }
+
+        Integer receiveBufferSize = config.getReceiveBufferSize();
+
+        if (receiveBufferSize != null) {
+            session.getConfig().setReceiveBufferSize(receiveBufferSize);
+        }
+
+        Integer sendBufferSize = config.getSendBufferSize();
+
+        if (sendBufferSize != null) {
+            session.getConfig().setSendBufferSize(sendBufferSize);
+        }
+
+        Integer trafficClass = config.getTrafficClass();
+
+        if (trafficClass != null) {
+            session.getConfig().setTrafficClass(trafficClass);
+        }
+
+        Integer soLinger = config.getSoLinger();
+
+        if (soLinger != null) {
+            session.getConfig().setSoLinger(soLinger);
+        }
+
+        // Set the secured flag if the service is to be used over SSL/TLS
+        if (config.isSecured()) {
+            session.initSecure(config.getSslContext());
+        }
+
+        // event session created
+        session.processSessionCreated();
+
+        // add the session to the queue for being added to the selector
+        readWriteSelectorLoop.register(false, true, false, session, socketChannel);
+        readWriteSelectorLoop.incrementServiceCount();
+        session.processSessionOpened();
+        session.setConnected();
     }
+
 }
\ No newline at end of file

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioTcpSession.java Sun Sep 30 15:35:05 2012
@@ -26,16 +26,14 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.Queue;
 
-import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoService;
-import org.apache.mina.api.RuntimeIoException;
 import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
 import org.apache.mina.session.DefaultWriteFuture;
 import org.apache.mina.session.SslHelper;
 import org.apache.mina.session.WriteRequest;
 import org.apache.mina.transport.tcp.ProxyTcpSessionConfig;
 import org.apache.mina.transport.tcp.TcpSessionConfig;
-import org.apache.mina.util.AbstractIoFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,32 +44,23 @@ import org.slf4j.LoggerFactory;
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  *
  */
-public class NioTcpSession extends AbstractNioSession implements SelectorEventListener {
+public class NioTcpSession extends AbstractIoSession implements SelectorListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(NioTcpSession.class);
 
     /** the NIO socket channel for this TCP session */
     private final SocketChannel channel;
 
+    /** the selector loop in charge of generating read/write events for this session */
+    private final SelectorLoop selectorLoop;
+
     /** the socket configuration */
     private final TcpSessionConfig configuration;
 
-    /** we pre-allocate a close future for lock-less {@link #close(boolean)} */
-    private final IoFuture<Void> closeFuture = new AbstractIoFuture<Void>() {
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override
-        protected boolean cancelOwner(boolean mayInterruptIfRunning) {
-            // we don't cancel close
-            return false;
-        }
-    };
-
-    NioTcpSession(IoService service, SocketChannel channel, NioSelectorProcessor writeProcessor, IdleChecker idleChecker) {
-        super(service, writeProcessor, idleChecker);
+    NioTcpSession(IoService service, SocketChannel channel, SelectorLoop selectorLoop, IdleChecker idleChecker) {
+        super(service, idleChecker);
         this.channel = channel;
+        this.selectorLoop = selectorLoop;
         this.configuration = new ProxyTcpSessionConfig(channel.socket());
     }
 
@@ -122,42 +111,6 @@ public class NioTcpSession extends Abstr
      * {@inheritDoc}
      */
     @Override
-    public IoFuture<Void> close(boolean immediately) {
-        switch (state) {
-        case CREATED:
-            LOG.error("Session {} not opened", this);
-            throw new RuntimeIoException("cannot close an not opened session");
-        case CONNECTED:
-            state = SessionState.CLOSING;
-            if (immediately) {
-                try {
-                    channel.close();
-                } catch (IOException e) {
-                    throw new RuntimeIoException(e);
-                }
-            } else {
-                // flush this session the flushing code will close the session
-                writeProcessor.flush(this);
-            }
-            break;
-        case CLOSING:
-            // return the same future
-            LOG.warn("Already closing session {}", this);
-            break;
-        case CLOSED:
-            LOG.warn("Already closed session {}", this);
-            break;
-        default:
-            throw new RuntimeIoException("not implemented session state : " + state);
-        }
-
-        return closeFuture;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
     public void suspendRead() {
         // TODO
         throw new RuntimeException("Not implemented");
@@ -196,7 +149,7 @@ public class NioTcpSession extends Abstr
     @Override
     public boolean isReadSuspended() {
         // TODO
-        throw new RuntimeException("Not implemented");
+        return false;
     }
 
     /**
@@ -231,120 +184,144 @@ public class NioTcpSession extends Abstr
      * {@inheritDoc}
      */
     @Override
-    public void acceptReady(NioSelectorProcessor processor) {
-        // should never happen
-        throw new IllegalStateException("accept event should never occur on NioTcpSession");
+    protected void channelClose() {
+        try {
+            selectorLoop.unregister(this, channel);
+            selectorLoop.decrementServiceCount();
+            channel.close();
+        } catch (IOException e) {
+            LOG.error("Exception while closing the channel : ", e);
+        }
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) throws IOException {
-        LOG.debug("readable session : {}", this);
-        readBuffer.clear();
-        int readCount = channel.read(readBuffer);
-
-        LOG.debug("read {} bytes", readCount);
-
-        if (readCount < 0) {
-            // session closed by the remote peer
-            LOG.debug("session closed by the remote peer");
-            processor.addSessionToClose(this);
-        } else {
-            // we have read some data
-            // limit at the current position & rewind buffer back to start &
-            // push to the chain
-            readBuffer.flip();
-
-            if (isSecured()) {
-                // We are reading data over a SSL/TLS encrypted connection.
-                // Redirect
-                // the processing to the SslHelper class.
-                SslHelper sslHelper = getAttribute(SSL_HELPER, null);
-
-                if (sslHelper == null) {
-                    throw new IllegalStateException();
-                }
-
-                sslHelper.processRead(this, readBuffer);
-            } else {
-                // Plain message, not encrypted : go directly to the chain
-                processMessageReceived(readBuffer);
-            }
-
-            idleChecker.sessionRead(this, System.currentTimeMillis());
-        }
-
+    public void flushWriteQueue() {
+        // register for write
+        selectorLoop.modifyRegistration(false, !isReadSuspended(), true, this, channel);
     }
 
     /**
      * {@inheritDoc}
      */
     @Override
-    public void writeReady(NioSelectorProcessor processor) throws IOException {
-        LOG.debug("writable session : {}", this);
+    public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+        if (read) {
+            try {
 
-        setNotRegisteredForWrite();
+                LOG.debug("readable session : {}", this);
+                readBuffer.clear();
+                int readCount = channel.read(readBuffer);
 
-        // write from the session write queue
-        boolean isEmpty = false;
+                LOG.debug("read {} bytes", readCount);
 
-        try {
-            Queue<WriteRequest> queue = acquireWriteQueue();
-
-            do {
-                // get a write request from the queue
-                WriteRequest wreq = queue.peek();
+                if (readCount < 0) {
+                    // session closed by the remote peer
+                    LOG.debug("session closed by the remote peer");
+                    close(true);
+                } else {
+                    // we have read some data
+                    // limit at the current position & rewind buffer back to start &
+                    // push to the chain
+                    readBuffer.flip();
+
+                    if (isSecured()) {
+                        // We are reading data over a SSL/TLS encrypted connection.
+                        // Redirect
+                        // the processing to the SslHelper class.
+                        SslHelper sslHelper = getAttribute(SSL_HELPER, null);
+
+                        if (sslHelper == null) {
+                            throw new IllegalStateException();
+                        }
+
+                        sslHelper.processRead(this, readBuffer);
+                    } else {
+                        // Plain message, not encrypted : go directly to the chain
+                        processMessageReceived(readBuffer);
+                    }
 
-                if (wreq == null) {
-                    break;
+                    idleChecker.sessionRead(this, System.currentTimeMillis());
                 }
+            } catch (IOException e) {
+                LOG.error("Exception while reading : ", e);
+            }
 
-                ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+        }
+        if (write) {
+            try {
+                LOG.debug("ready for write");
+                LOG.debug("writable session : {}", this);
 
-                // Note that if the connection is secured, the buffer
-                // already
-                // contains encrypted data.
-                int wrote = getSocketChannel().write(buf);
-                incrementWrittenBytes(wrote);
-                LOG.debug("wrote {} bytes to {}", wrote, this);
-
-                idleChecker.sessionWritten(this, System.currentTimeMillis());
-
-                if (buf.remaining() == 0) {
-                    // completed write request, let's remove it
-                    queue.remove();
-                    // complete the future
-                    DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+                setNotRegisteredForWrite();
 
-                    if (future != null) {
-                        future.complete();
-                    }
-                } else {
-                    // output socket buffer is full, we need
-                    // to give up until next selection for
-                    // writing
-                    break;
-                }
-            } while (!queue.isEmpty());
+                // write from the session write queue
+                boolean isEmpty = false;
 
-            isEmpty = queue.isEmpty();
-        } finally {
-            this.releaseWriteQueue();
-        }
+                try {
+                    Queue<WriteRequest> queue = acquireWriteQueue();
 
-        // if the session is no more interested in writing, we need
-        // to stop listening for OP_WRITE events
-        if (isEmpty) {
-            if (isClosing()) {
-                LOG.debug("closing session {} have empty write queue, so we close it", this);
-                // we was flushing writes, now we to the close
-                getSocketChannel().close();
-            } else {
-                // no more write event needed
-                processor.cancelKeyForWritting(this);
+                    do {
+                        // get a write request from the queue
+                        WriteRequest wreq = queue.peek();
+
+                        if (wreq == null) {
+                            break;
+                        }
+
+                        ByteBuffer buf = (ByteBuffer) wreq.getMessage();
+
+                        // Note that if the connection is secured, the buffer
+                        // already
+                        // contains encrypted data.
+                        int wrote = getSocketChannel().write(buf);
+                        incrementWrittenBytes(wrote);
+                        LOG.debug("wrote {} bytes to {}", wrote, this);
+
+                        idleChecker.sessionWritten(this, System.currentTimeMillis());
+
+                        if (buf.remaining() == 0) {
+                            // completed write request, let's remove it
+                            queue.remove();
+                            // complete the future
+                            DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
+
+                            if (future != null) {
+                                future.complete();
+                            }
+                        } else {
+                            // output socket buffer is full, we need
+                            // to give up until next selection for
+                            // writing
+                            break;
+                        }
+                    } while (!queue.isEmpty());
+
+                    isEmpty = queue.isEmpty();
+                } finally {
+                    this.releaseWriteQueue();
+                }
+
+                // if the session is no more interested in writing, we need
+                // to stop listening for OP_WRITE events
+                if (isEmpty) {
+                    if (isClosing()) {
+                        LOG.debug("closing session {} have empty write queue, so we close it", this);
+                        // we was flushing writes, now we to the close
+                        channelClose();
+                    } else {
+                        // no more write event needed
+                        selectorLoop.modifyRegistration(false, !isReadSuspended(), false, this, channel);
+                    }
+                }
+            } catch (IOException e) {
+                LOG.error("Exception while reading : ", e);
             }
         }
+        if (accept) {
+            throw new IllegalStateException("accept event should never occur on NioTcpSession");
+        }
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpServer.java Sun Sep 30 15:35:05 2012
@@ -28,7 +28,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.mina.api.IoSessionConfig;
-import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.transport.udp.AbstractUdpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,18 +39,18 @@ import org.slf4j.LoggerFactory;
  * 
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public class NioUdpServer extends AbstractUdpServer implements SelectorEventListener {
+public class NioUdpServer extends AbstractUdpServer implements SelectorListener {
 
     static final Logger LOG = LoggerFactory.getLogger(NioUdpServer.class);
 
     // the bound local address
     private SocketAddress address = null;
 
-    // the strategy for dispatching servers and client to selector threads.
-    private final SelectorStrategy<NioSelectorProcessor> strategy;
-
     // the processor used for read and write this server
-    private NioSelectorProcessor processor;
+    private final NioSelectorLoop selectorLoop;
+
+    // used for detecting idle sessions
+    private final IdleChecker idleChecker = new IndexedIdleChecker();
 
     // the inner channel for read/write UDP datagrams
     private DatagramChannel datagramChannel = null;
@@ -63,9 +64,9 @@ public class NioUdpServer extends Abstra
     /**
      * Create a new instance of NioUdpServer
      */
-    public NioUdpServer(final SelectorStrategy<NioSelectorProcessor> strategy) {
+    public NioUdpServer(final NioSelectorLoop selectorLoop) {
         super();
-        this.strategy = strategy;
+        this.selectorLoop = selectorLoop;
     }
 
     /**
@@ -119,9 +120,8 @@ public class NioUdpServer extends Abstra
         datagramChannel.socket().bind(address);
         datagramChannel.configureBlocking(false);
 
-        processor = this.strategy.getSelectorForBindNewAddress();
-
-        processor.addServer(this);
+        selectorLoop.register(false, true, false, this, datagramChannel);
+        selectorLoop.incrementServiceCount();
 
         // it's the first address bound, let's fire the event
         this.fireServiceActivated();
@@ -136,11 +136,13 @@ public class NioUdpServer extends Abstra
         if (this.address == null) {
             throw new IllegalStateException("server not bound");
         }
+
+        selectorLoop.unregister(this, datagramChannel);
+        selectorLoop.decrementServiceCount();
+
         datagramChannel.socket().close();
         datagramChannel.close();
 
-        processor.removeServer(this);
-
         this.address = null;
         this.fireServiceInactivated();
     }
@@ -160,44 +162,35 @@ public class NioUdpServer extends Abstra
     }
 
     /**
-    * {@inheritDoc}
-    */
-    @Override
-    public void acceptReady(NioSelectorProcessor processor) {
-        throw new IllegalStateException("accept event should never occur on NioUdpServer");
-    }
-
-    /**
-    * {@inheritDoc}
-    */
+     * {@inheritDoc}
+     */
     @Override
-    public void readReady(NioSelectorProcessor processor, ByteBuffer readBuffer) throws IOException {
-        LOG.debug("readable datagram for UDP service : {}", this);
-        readBuffer.clear();
-
-        SocketAddress source = datagramChannel.receive(readBuffer);
-        readBuffer.flip();
-
-        LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
-
-        // let's find the corresponding session
-
-        NioUdpSession session = sessions.get(source);
-        if (session == null) {
-            session = new NioUdpSession(this, strategy.getSelectorForNewSession(processor), processor.getIdleChecker(),
-                    address, source);
+    public void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write) {
+        if (read) {
+            try {
+                LOG.debug("readable datagram for UDP service : {}", this);
+                readBuffer.clear();
+
+                SocketAddress source = datagramChannel.receive(readBuffer);
+                readBuffer.flip();
+
+                LOG.debug("read {} bytes form {}", readBuffer.remaining(), source);
+
+                // let's find the corresponding session
+
+                NioUdpSession session = sessions.get(source);
+                if (session == null) {
+                    session = new NioUdpSession(this, idleChecker, address, source);
+                }
+
+                session.receivedDatagram(readBuffer);
+            } catch (IOException ex) {
+                LOG.error("IOException while reading the socket", ex);
+            }
+        }
+        if (write) {
+            // TODO : flush session
         }
-
-        session.receivedDatagram(readBuffer);
-    }
-
-    /**
-    * {@inheritDoc}
-    */
-    @Override
-    public void writeReady(NioSelectorProcessor processor) throws IOException {
-        // TODO : flush the sessions
-
     }
 
 }
\ No newline at end of file

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/NioUdpSession.java Sun Sep 30 15:35:05 2012
@@ -27,6 +27,7 @@ import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSessionConfig;
 import org.apache.mina.api.RuntimeIoException;
 import org.apache.mina.service.idlechecker.IdleChecker;
+import org.apache.mina.session.AbstractIoSession;
 import org.apache.mina.util.AbstractIoFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +36,7 @@ import org.slf4j.LoggerFactory;
  * A UDP session based on NIO
  * @author <a href="http://mina.apache.org">Apache MINA Project</a>
  */
-public class NioUdpSession extends AbstractNioSession {
+public class NioUdpSession extends AbstractIoSession {
 
     private static final Logger LOG = LoggerFactory.getLogger(NioUdpSession.class);
 
@@ -61,9 +62,9 @@ public class NioUdpSession extends Abstr
      * @param writeProcessor
      * @param idleChecker
      */
-    public NioUdpSession(IoService service, NioSelectorProcessor writeProcessor, IdleChecker idleChecker,
-            SocketAddress localAddress, SocketAddress remoteAddress) {
-        super(service, writeProcessor, idleChecker);
+    public NioUdpSession(IoService service, IdleChecker idleChecker, SocketAddress localAddress,
+            SocketAddress remoteAddress) {
+        super(service, idleChecker);
         this.localAddress = localAddress;
         this.remoteAddress = remoteAddress;
     }
@@ -72,6 +73,22 @@ public class NioUdpSession extends Abstr
      * {@inheritDoc}
      */
     @Override
+    protected void channelClose() {
+        // No inner socket to close for UDP
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void flushWriteQueue() {
+        // TODO flush queue
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
     public SocketAddress getRemoteAddress() {
         return remoteAddress;
     }

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.nio.ByteBuffer;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface SelectorListener {
+
+    void ready(boolean accept, boolean read, ByteBuffer readBuffer, boolean write);
+
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java?rev=1392037&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java Sun Sep 30 15:35:05 2012
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mina.transport.nio;
+
+import java.nio.channels.SelectableChannel;
+
+/**
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ */
+public interface SelectorLoop {
+
+    public abstract void register(boolean accept, boolean read, boolean write, SelectorListener listener,
+            SelectableChannel channel);
+
+    public abstract void modifyRegistration(boolean accept, boolean read, boolean write, SelectorListener listener,
+            SelectableChannel channel);
+
+    public abstract void unregister(SelectorListener listener, SelectableChannel channel);
+
+    public abstract void incrementServiceCount();
+
+    public abstract void decrementServiceCount();
+
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/nio/SelectorLoop.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/service/idlecheker/IndexedIdleChekerTest.java Sun Sep 30 15:35:05 2012
@@ -21,8 +21,8 @@
  */
 package org.apache.mina.service.idlecheker;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 import java.net.SocketAddress;
 
@@ -30,7 +30,6 @@ import org.apache.mina.api.IdleStatus;
 import org.apache.mina.api.IoFuture;
 import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSessionConfig;
-import org.apache.mina.service.SelectorProcessor;
 import org.apache.mina.service.idlechecker.IdleChecker;
 import org.apache.mina.service.idlechecker.IndexedIdleChecker;
 import org.apache.mina.session.AbstractIoSession;
@@ -85,8 +84,6 @@ public class IndexedIdleChekerTest {
         assertEquals(1, session.writeIdleCount);
     }
 
-    private final SelectorProcessor processor = mock(SelectorProcessor.class);
-
     private class DummySession extends AbstractIoSession {
 
         int readIdleCount = 0;
@@ -94,7 +91,7 @@ public class IndexedIdleChekerTest {
         int writeIdleCount = 0;
 
         private DummySession(IoService service, IdleChecker checker) {
-            super(service, processor, checker);
+            super(service, checker);
         }
 
         @Override
@@ -179,5 +176,21 @@ public class IndexedIdleChekerTest {
             // TODO Auto-generated method stub
             return false;
         }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected void channelClose() {
+
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public void flushWriteQueue() {
+
+        }
     }
 }

Modified: mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java (original)
+++ mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java Sun Sep 30 15:35:05 2012
@@ -18,16 +18,9 @@
  */
 package org.apache.mina.session;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static junit.framework.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
@@ -41,7 +34,6 @@ import org.apache.mina.api.IoService;
 import org.apache.mina.api.IoSessionConfig;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.SelectorProcessor;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -52,11 +44,9 @@ import org.junit.Test;
  */
 public class AbstractIoSessionTest {
 
-    private final SelectorProcessor processor = mock(SelectorProcessor.class);
-
     private class DummySession extends AbstractIoSession {
         private DummySession(IoService service) {
-            super(service, processor, null);
+            super(service, null);
         }
 
         @Override
@@ -122,9 +112,23 @@ public class AbstractIoSessionTest {
 
         @Override
         public boolean isClosed() {
-            // TODO Auto-generated method stub
             return false;
         }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        protected void channelClose() {
+
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override
+        public void flushWriteQueue() {
+        }
     }
 
     private IoService service = null;
@@ -200,7 +204,6 @@ public class AbstractIoSessionTest {
         verify(filter1).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
         verify(filter2).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
         verify(filter3).messageWriting(eq(session), eq(buffer), any(WriteFilterChainController.class));
-        verify(processor).flush(eq(session));
     }
 
     @Test

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java Sun Sep 30 15:35:05 2012
@@ -33,8 +33,7 @@ import org.apache.mina.api.IoSession;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
 import org.apache.mina.transport.nio.NioTcpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,9 +51,7 @@ public class NioEchoServer {
     public static void main(String[] args) {
         LOG.info("starting echo server");
 
-        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-        
-        NioTcpServer acceptor = new NioTcpServer(strategy);
+        NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
 
         // create the fitler chain for this service
         acceptor.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpTest.java Sun Sep 30 15:35:05 2012
@@ -38,8 +38,7 @@ import org.apache.mina.http.api.HttpMeth
 import org.apache.mina.http.api.HttpRequest;
 import org.apache.mina.http.api.HttpStatus;
 import org.apache.mina.http.api.HttpVersion;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
 import org.apache.mina.transport.nio.NioTcpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,9 +49,7 @@ public class HttpTest {
 
     public static void main(String[] args) throws Exception {
 
-        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-
-        NioTcpServer acceptor = new NioTcpServer(strategy);
+        NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
         acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
                 new DummyHttpSever());
 

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/http/HttpsTest.java Sun Sep 30 15:35:05 2012
@@ -38,8 +38,7 @@ import org.apache.mina.http.api.HttpMeth
 import org.apache.mina.http.api.HttpRequest;
 import org.apache.mina.http.api.HttpStatus;
 import org.apache.mina.http.api.HttpVersion;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
 import org.apache.mina.transport.nio.NioTcpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,8 +49,7 @@ public class HttpsTest {
 
     public static void main(String[] args) throws Exception {
 
-        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-        NioTcpServer acceptor = new NioTcpServer(strategy);
+        NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
 
         acceptor.setFilters(new LoggingFilter("INCOMING"), new HttpServerCodec(), new LoggingFilter("DECODED"),
                 new DummyHttpSever());

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/ldap/LdapTest.java Sun Sep 30 15:35:05 2012
@@ -39,8 +39,7 @@ import org.apache.mina.api.IoSession;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.ldap.LdapCodec;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
 import org.apache.mina.transport.nio.NioTcpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,8 +52,7 @@ public class LdapTest {
 
     public static void main(String[] args) throws Exception {
         LdapTest ldapServer = new LdapTest();
-        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(new NioSelectorProcessor());
-        NioTcpServer acceptor = new NioTcpServer(strategy);
+        NioTcpServer acceptor = new NioTcpServer(new NioSelectorLoop(), new NioSelectorLoop());
         acceptor.setFilters(new LoggingFilter("INCOMING"), new LdapCodec(), new LoggingFilter("DECODED"),
                 ldapServer.new DummyLdapServer());
 

Modified: mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java
URL: http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java?rev=1392037&r1=1392036&r2=1392037&view=diff
==============================================================================
--- mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java (original)
+++ mina/trunk/examples/src/main/java/org/apache/mina/examples/udpecho/NioUdpEchoServer.java Sun Sep 30 15:35:05 2012
@@ -32,8 +32,7 @@ import org.apache.mina.api.IoSession;
 import org.apache.mina.filter.logging.LoggingFilter;
 import org.apache.mina.filterchain.ReadFilterChainController;
 import org.apache.mina.filterchain.WriteFilterChainController;
-import org.apache.mina.service.OneThreadSelectorStrategy;
-import org.apache.mina.transport.nio.NioSelectorProcessor;
+import org.apache.mina.transport.nio.NioSelectorLoop;
 import org.apache.mina.transport.nio.NioUdpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,10 +48,7 @@ public class NioUdpEchoServer {
     public static void main(String[] args) {
         LOG.info("starting echo server");
 
-        OneThreadSelectorStrategy<NioSelectorProcessor> strategy = new OneThreadSelectorStrategy<NioSelectorProcessor>(
-                new NioSelectorProcessor());
-
-        NioUdpServer server = new NioUdpServer(strategy);
+        NioUdpServer server = new NioUdpServer(new NioSelectorLoop());
 
         // create the fitler chain for this service
         server.setFilters(new LoggingFilter("LoggingFilter1"), new IoFilter() {