You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/09/16 16:16:17 UTC

svn commit: r576109 - in /mina/trunk/core/src/main/java/org/apache/mina: common/ transport/socket/nio/

Author: trustin
Date: Sun Sep 16 07:16:16 2007
New Revision: 576109

URL: http://svn.apache.org/viewvc?rev=576109&view=rev
Log:
* Replaced SocketIoProcessor with NIOProcessor
* DatagramConnector now uses NIOProcessor (TODO: make it multi-threaded)
* Added NIOSession


Added:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java   (with props)
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java   (with props)
Removed:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java Sun Sep 16 07:16:16 2007
@@ -147,11 +147,11 @@
                 doAdd(session);
                 addedSessions ++;
                 
-                // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
+                // DefaultIoFilterChain.CONNECT_FUTURE is cleared inside here
                 // in AbstractIoFilterChain.fireSessionOpened().
                 ((AbstractIoService) session.getService()).getListeners().fireSessionCreated(session);
             } catch (Exception e) {
-                // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
+                // Clear the DefaultIoFilterChain.CONNECT_FUTURE attribute
                 // and call ConnectFuture.setException().
                 session.getFilterChain().fireExceptionCaught(session, e);
             }
@@ -206,11 +206,12 @@
     }
     
     private void process(AbstractIoSession session) throws Exception {
-        if (((readyOps(session) & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
+        int readyOps = readyOps(session);
+        if (((readyOps & SelectionKey.OP_READ) != 0) && session.getTrafficMask().isReadable()) {
             read(session);
         }
 
-        if (((readyOps(session) & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
+        if (((readyOps & SelectionKey.OP_WRITE) != 0) && session.getTrafficMask().isWritable()) {
             scheduleFlush(session);
         }
     }
@@ -224,8 +225,15 @@
             int ret;
 
             try {
-                while ((ret = read(session, buf)) > 0) {
-                    readBytes += ret;
+                if (session.getTransportMetadata().hasFragmentation()) {
+                    while ((ret = read(session, buf)) > 0) {
+                        readBytes += ret;
+                    }
+                } else {
+                    ret = read(session, buf);
+                    if (ret > 0) {
+                        readBytes = ret;
+                    }
                 }
             } finally {
                 buf.flip();
@@ -237,16 +245,18 @@
                 session.getFilterChain().fireMessageReceived(session, buf);
                 buf = null;
                 
-                if (readBytes * 2 < config.getReadBufferSize()) {
-                    if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
-                        config.setReadBufferSize(config.getReadBufferSize() >>> 1);
-                    }
-                } else if (readBytes == config.getReadBufferSize()) {
-                    int newReadBufferSize = config.getReadBufferSize() << 1;
-                    if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
-                        config.setReadBufferSize(newReadBufferSize);
-                    } else {
-                        config.setReadBufferSize(config.getMaxReadBufferSize());
+                if (!session.getTransportMetadata().hasFragmentation()) {
+                    if (readBytes * 2 < config.getReadBufferSize()) {
+                        if (config.getReadBufferSize() > config.getMinReadBufferSize()) {
+                            config.setReadBufferSize(config.getReadBufferSize() >>> 1);
+                        }
+                    } else if (readBytes == config.getReadBufferSize()) {
+                        int newReadBufferSize = config.getReadBufferSize() << 1;
+                        if (newReadBufferSize <= (config.getMaxReadBufferSize())) {
+                            config.setReadBufferSize(newReadBufferSize);
+                        } else {
+                            config.setReadBufferSize(config.getMaxReadBufferSize());
+                        }
                     }
                 }
             }
@@ -423,7 +433,7 @@
                         region.setPosition(region.getPosition() + localWrittenBytes);
                         writtenBytes += localWrittenBytes;
                     }
-
+                    
                     if (region.getCount() > 0 || writtenBytes >= maxWrittenBytes) {
                         // Kernel buffer is full or wrote too much.
                         interestOps(session, interestOps(session) | SelectionKey.OP_WRITE);

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Sun Sep 16 07:16:16 2007
@@ -22,27 +22,17 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 
 import org.apache.mina.common.AbstractIoConnector;
-import org.apache.mina.common.DefaultIoFilterChain;
-import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.DefaultConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoProcessor;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportMetadata;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.util.NamePreservingRunnable;
 import org.apache.mina.util.NewThreadExecutor;
 
 /**
@@ -54,23 +44,9 @@
 public class DatagramConnector extends AbstractIoConnector {
     private static volatile int nextId = 0;
 
-    private final Executor executor;
-
     private final int id = nextId++;
 
-    private final Selector selector;
-    
-    private final IoProcessor processor = new DatagramConnectorProcessor();
-
-    private final Queue<RegistrationRequest> registerQueue = new ConcurrentLinkedQueue<RegistrationRequest>();
-
-    private final Queue<DatagramSessionImpl> cancelQueue = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
-    private final Queue<DatagramSessionImpl> flushingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
-    private final Queue<DatagramSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<DatagramSessionImpl>();
-
-    private Worker worker;
+    private final IoProcessor processor;
 
     /**
      * Creates a new instance.
@@ -84,24 +60,8 @@
      */
     public DatagramConnector(Executor executor) {
         super(new DefaultDatagramSessionConfig());
-
-        try {
-            this.selector = Selector.open();
-        } catch (IOException e) {
-            throw new RuntimeIOException("Failed to open a selector.", e);
-        }
-
-        this.executor = executor;
-    }
-
-    @Override
-    protected void finalize() throws Throwable {
-        super.finalize();
-        try {
-            selector.close();
-        } catch (IOException e) {
-            ExceptionMonitor.getInstance().exceptionCaught(e);
-        }
+        
+        processor = new NIOProcessor("DatagramConnector-" + id, executor);
     }
 
     public TransportMetadata getTransportMetadata() {
@@ -118,6 +78,7 @@
                                       SocketAddress localAddress) {
         DatagramChannel ch = null;
         boolean initialized = false;
+        IoSession session = null;
         try {
             ch = DatagramChannel.open();
             DatagramSessionConfig cfg = getSessionConfig();
@@ -135,9 +96,17 @@
                 ch.socket().bind(localAddress);
             }
             ch.connect(remoteAddress);
-            ch.configureBlocking(false);
+            
+            session = new DatagramSessionImpl(this, ch, getHandler());
+            getFilterChainBuilder().buildFilterChain(session.getFilterChain());
+            ConnectFuture future = new DefaultConnectFuture();
+            // DefaultIoFilterChain will notify the connect future.
+            session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
+            
+            processor.add(session);
             initialized = true;
-        } catch (IOException e) {
+            return future;
+        } catch (Exception e) {
             return DefaultConnectFuture.newFailedFuture(e);
         } finally {
             if (!initialized && ch != null) {
@@ -149,341 +118,9 @@
                 }
             }
         }
-
-        RegistrationRequest request = new RegistrationRequest(ch);
-
-        startupWorker();
-        registerQueue.add(request);
-        selector.wakeup();
-
-        return request;
     }
     
     IoProcessor getProcessor() {
         return processor;
-    }
-    
-    private class DatagramConnectorProcessor implements IoProcessor {
-        public void add(IoSession session) {
-        }
-
-        public void flush(IoSession session, WriteRequest writeRequest) {
-            if (scheduleFlush((DatagramSessionImpl) session)) {
-                Selector selector = DatagramConnector.this.selector;
-                if (selector != null) {
-                    selector.wakeup();
-                }
-            }
-        }
-
-        public void remove(IoSession session) {
-            startupWorker();
-            cancelQueue.add((DatagramSessionImpl) session);
-            selector.wakeup();
-        }
-
-        public void updateTrafficMask(IoSession session) {
-            scheduleTrafficControl((DatagramSessionImpl) session);
-            Selector selector = DatagramConnector.this.selector;
-            if (selector != null) {
-                selector.wakeup();
-            }
-            selector.wakeup();
-        }
-    }
-
-    private synchronized void startupWorker() {
-        if (worker == null) {
-            worker = new Worker();
-            executor.execute(new NamePreservingRunnable(worker));
-        }
-    }
-
-    private boolean scheduleFlush(DatagramSessionImpl session) {
-        if (session.setScheduledForFlush(true)) {
-            flushingSessions.add(session);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    private void scheduleTrafficControl(DatagramSessionImpl session) {
-        trafficControllingSessions.add(session);
-    }
-
-    private void doUpdateTrafficMask() {
-        for (; ;) {
-            DatagramSessionImpl session = trafficControllingSessions.poll();
-            if (session == null) {
-                break;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.suspend??() or session.resume??() is
-            // called before addSession() is processed)
-            if (key == null) {
-                scheduleTrafficControl(session);
-                break;
-            }
-            // skip if channel is already closed
-            if (!key.isValid()) {
-                continue;
-            }
-
-            // The normal is OP_READ and, if there are write requests in the
-            // session's write queue, set OP_WRITE to trigger flushing.
-            int ops = SelectionKey.OP_READ;
-            Queue<WriteRequest> writeRequestQueue = session
-                    .getWriteRequestQueue();
-            synchronized (writeRequestQueue) {
-                if (!writeRequestQueue.isEmpty()) {
-                    ops |= SelectionKey.OP_WRITE;
-                }
-            }
-
-            // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps(ops & mask);
-        }
-    }
-
-    private class Worker implements Runnable {
-        public void run() {
-            Thread.currentThread().setName("DatagramConnector-" + id);
-
-            for (; ;) {
-                try {
-                    int nKeys = selector.select();
-
-                    registerNew();
-                    doUpdateTrafficMask();
-
-                    if (nKeys > 0) {
-                        processReadySessions(selector.selectedKeys());
-                    }
-
-                    flushSessions();
-                    cancelKeys();
-
-                    if (selector.keys().isEmpty()) {
-                        synchronized (DatagramConnector.this) {
-                            if (selector.keys().isEmpty()
-                                    && registerQueue.isEmpty()
-                                    && cancelQueue.isEmpty()) {
-                                worker = null;
-                                break;
-                            }
-                        }
-                    }
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                    }
-                }
-            }
-        }
-    }
-
-    private void processReadySessions(Set<SelectionKey> keys) {
-        Iterator<SelectionKey> it = keys.iterator();
-        while (it.hasNext()) {
-            SelectionKey key = it.next();
-            it.remove();
-
-            DatagramSessionImpl session = (DatagramSessionImpl) key
-                    .attachment();
-
-            if (key.isReadable() && session.getTrafficMask().isReadable()) {
-                readSession(session);
-            }
-
-            if (key.isWritable() && session.getTrafficMask().isWritable()) {
-                scheduleFlush(session);
-            }
-        }
-    }
-
-    private void readSession(DatagramSessionImpl session) {
-
-        ByteBuffer readBuf = ByteBuffer.allocate(session.getReadBufferSize());
-        try {
-            int readBytes = session.getChannel().read(readBuf.buf());
-            if (readBytes > 0) {
-                readBuf.flip();
-                ByteBuffer newBuf = ByteBuffer.allocate(readBuf.limit());
-                newBuf.put(readBuf);
-                newBuf.flip();
-
-                session.increaseReadBytes(readBytes);
-                session.getFilterChain().fireMessageReceived(session, newBuf);
-            }
-        } catch (IOException e) {
-            session.getFilterChain().fireExceptionCaught(session, e);
-        }
-    }
-
-    private void flushSessions() {
-        for (; ;) {
-            DatagramSessionImpl session = flushingSessions.poll();
-            if (session == null) {
-                break;
-            }
-            
-            session.setScheduledForFlush(false);
-
-            try {
-                boolean flushedAll = flush(session);
-                if (flushedAll && !session.getWriteRequestQueue().isEmpty() && !session.isScheduledForFlush()) {
-                    scheduleFlush(session);
-                }
-            } catch (IOException e) {
-                session.getFilterChain().fireExceptionCaught(session, e);
-            }
-        }
-    }
-
-    private boolean flush(DatagramSessionImpl session) throws IOException {
-        // Clear OP_WRITE
-        SelectionKey key = session.getSelectionKey();
-        if (key == null) {
-            scheduleFlush(session);
-            return false;
-        }
-        if (!key.isValid()) {
-            return false;
-        }
-        key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-        DatagramChannel ch = session.getChannel();
-        Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
-        int writtenBytes = 0;
-        int maxWrittenBytes = session.getConfig().getSendBufferSize() << 1;
-        try {
-            for (; ;) {
-                WriteRequest req;
-                synchronized (writeRequestQueue) {
-                    req = writeRequestQueue.peek();
-                }
-
-                if (req == null) {
-                    break;
-                }
-
-                ByteBuffer buf = (ByteBuffer) req.getMessage();
-                if (buf.remaining() == 0) {
-                    // pop and fire event
-                    synchronized (writeRequestQueue) {
-                        writeRequestQueue.poll();
-                    }
-
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(session, req);
-                    continue;
-                }
-
-                int localWrittenBytes = ch.write(buf.buf());
-                if (localWrittenBytes == 0 || writtenBytes >= maxWrittenBytes) {
-                    // Kernel buffer is full or wrote too much
-                    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
-                    return false;
-                } else {
-                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
-
-                    // pop and fire event
-                    synchronized (writeRequestQueue) {
-                        writeRequestQueue.poll();
-                    }
-
-                    writtenBytes += localWrittenBytes;
-                    session.increaseWrittenMessages();
-                    buf.reset();
-                    session.getFilterChain().fireMessageSent(session, req);
-                }
-            }
-        } finally {
-            session.increaseWrittenBytes(writtenBytes);
-        }
-        
-        return true;
-    }
-
-    private void registerNew() {
-        for (; ;) {
-            RegistrationRequest req = registerQueue.poll();
-            if (req == null) {
-                break;
-            }
-
-            DatagramSessionImpl session = new DatagramSessionImpl(
-                    this, req.channel, getHandler());
-
-            // AbstractIoFilterChain will notify the connect future.
-            session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, req);
-
-            boolean success = false;
-            try {
-                SelectionKey key = req.channel.register(selector,
-                        SelectionKey.OP_READ, session);
-
-                session.setSelectionKey(key);
-                buildFilterChain(session);
-                // The CONNECT_FUTURE attribute is cleared and notified here.
-                getListeners().fireSessionCreated(session);
-                success = true;
-            } catch (Throwable t) {
-                // The CONNECT_FUTURE attribute is cleared and notified here.
-                session.getFilterChain().fireExceptionCaught(session, t);
-            } finally {
-                if (!success) {
-                    try {
-                        req.channel.disconnect();
-                        req.channel.close();
-                    } catch (IOException e) {
-                        ExceptionMonitor.getInstance().exceptionCaught(e);
-                    }
-                }
-            }
-        }
-    }
-
-    private void buildFilterChain(IoSession session) throws Exception {
-        getFilterChainBuilder().buildFilterChain(session.getFilterChain());
-    }
-
-    private void cancelKeys() {
-        for (; ;) {
-            DatagramSessionImpl session = cancelQueue.poll();
-            if (session == null) {
-                break;
-            } else {
-                SelectionKey key = session.getSelectionKey();
-                DatagramChannel ch = (DatagramChannel) key.channel();
-                try {
-                    ch.disconnect();
-                    ch.close();
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                }
-
-                getListeners().fireSessionDestroyed(session);
-                key.cancel();
-                selector.wakeup(); // wake up again to trigger thread death
-            }
-        }
-    }
-
-    private static class RegistrationRequest extends DefaultConnectFuture {
-        private final DatagramChannel channel;
-
-        private RegistrationRequest(DatagramChannel channel) {
-            this.channel = channel;
-        }
     }
 }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/DatagramSessionImpl.java Sun Sep 16 07:16:16 2007
@@ -25,7 +25,6 @@
 import java.nio.channels.DatagramChannel;
 import java.nio.channels.SelectionKey;
 
-import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.DefaultIoFilterChain;
@@ -46,7 +45,7 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
-class DatagramSessionImpl extends AbstractIoSession implements DatagramSession {
+class DatagramSessionImpl extends NIOSession implements DatagramSession {
 
     static final TransportMetadata METADATA =
             new DefaultTransportMetadata(
@@ -70,8 +69,6 @@
 
     private SelectionKey key;
 
-    private int readBufferSize;
-
     /**
      * Creates a new acceptor instance.
      */
@@ -180,10 +177,6 @@
         return (InetSocketAddress) super.getServiceAddress();
     }
 
-    int getReadBufferSize() {
-        return readBufferSize;
-    }
-
     private class SessionConfigImpl extends AbstractDatagramSessionConfig {
 
         public int getReceiveBufferSize() {
@@ -200,7 +193,7 @@
                     ch.socket().setReceiveBufferSize(receiveBufferSize);
                     // Re-retrieve the effective receive buffer size.
                     receiveBufferSize = ch.socket().getReceiveBufferSize();
-                    DatagramSessionImpl.this.readBufferSize = receiveBufferSize;
+                    DatagramSessionImpl.this.config.setReadBufferSize(receiveBufferSize);
                 } catch (SocketException e) {
                     throw new RuntimeIOException(e);
                 }

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java?rev=576109&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java Sun Sep 16 07:16:16 2007
@@ -0,0 +1,175 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import org.apache.mina.common.AbstractIoProcessor;
+import org.apache.mina.common.AbstractIoSession;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.FileRegion;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.RuntimeIOException;
+
+/**
+ * 
+ * @author Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+class NIOProcessor extends AbstractIoProcessor {
+
+    protected final Selector selector;
+
+    NIOProcessor(String threadName, Executor executor) {
+        super(threadName, executor);
+    
+        try {
+            this.selector = Selector.open();
+        } catch (IOException e) {
+            throw new RuntimeIOException("Failed to open a selector.", e);
+        }
+    }
+    
+    @Override
+    protected void finalize() throws Throwable {
+        super.finalize();
+        try {
+            selector.close();
+        } catch (IOException e) {
+            ExceptionMonitor.getInstance().exceptionCaught(e);
+        }
+    }
+
+    @Override
+    protected int select(int timeout) throws Exception {
+        return selector.select(1000);
+    }
+
+    @Override
+    protected void wakeup() {
+        selector.wakeup();
+    }
+
+    @Override
+    protected Iterator<AbstractIoSession> allSessions() throws Exception {
+        return new IoSessionIterator(selector.keys());
+    }
+
+    protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+        return new IoSessionIterator(selector.selectedKeys());
+    }
+    
+    @Override
+    protected void doAdd(IoSession session) throws Exception {
+        SelectableChannel ch = (SelectableChannel) getChannel(session);
+        ch.configureBlocking(false);
+        setSelectionKey(
+                session,
+                ch.register(selector, SelectionKey.OP_READ, session));
+    }
+    
+    @Override
+    protected void doRemove(IoSession session) throws Exception {
+        ByteChannel ch = getChannel(session);
+        SelectionKey key = getSelectionKey(session);
+        key.cancel();
+        ch.close();
+    }
+
+    @Override
+    protected SessionState state(IoSession session) {
+        SelectionKey key = getSelectionKey(session);
+        if (key == null) {
+            return SessionState.PREPARING;
+        }
+        
+        return key.isValid()? SessionState.OPEN : SessionState.CLOSED;
+    }
+
+    @Override
+    protected int readyOps(IoSession session) throws Exception {
+        return getSelectionKey(session).readyOps();
+    }
+
+    @Override
+    protected int interestOps(IoSession session) throws Exception {
+        return getSelectionKey(session).interestOps();
+    }
+
+    @Override
+    protected void interestOps(IoSession session, int interestOps) throws Exception {
+        getSelectionKey(session).interestOps(interestOps);
+    }
+
+    @Override
+    protected int read(IoSession session, ByteBuffer buf) throws Exception {
+        return getChannel(session).read(buf.buf());
+    }
+    
+    @Override
+    protected int write(IoSession session, ByteBuffer buf) throws Exception {
+        return getChannel(session).write(buf.buf());
+    }
+    
+    @Override
+    protected long transferFile(IoSession session, FileRegion region) throws Exception {
+        return region.getFileChannel().transferTo(region.getPosition(), region.getCount(), getChannel(session));
+    }
+    
+    private ByteChannel getChannel(IoSession session) {
+        return ((NIOSession) session).getChannel();
+    }
+    
+    private SelectionKey getSelectionKey(IoSession session) {
+        return ((NIOSession) session).getSelectionKey();
+    }
+    
+    private void setSelectionKey(IoSession session, SelectionKey key) {
+        ((NIOSession) session).setSelectionKey(key);
+    }
+
+
+    protected static class IoSessionIterator implements Iterator<AbstractIoSession> {
+        private final Iterator<SelectionKey> i;
+        private IoSessionIterator(Set<SelectionKey> keys) {
+            i = keys.iterator();
+        }
+        public boolean hasNext() {
+            return i.hasNext();
+        }
+    
+        public AbstractIoSession next() {
+            SelectionKey key = i.next();
+            return (AbstractIoSession) key.attachment();
+        }
+    
+        public void remove() {
+            i.remove();
+        }
+    }
+}
\ No newline at end of file

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java?rev=576109&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java Sun Sep 16 07:16:16 2007
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *  
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License. 
+ *  
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.common.AbstractIoSession;
+
+/**
+ * 
+ * @author Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+abstract class NIOSession extends AbstractIoSession {
+    abstract ByteChannel getChannel();
+    abstract SelectionKey getSelectionKey();
+    abstract void setSelectionKey(SelectionKey key);
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NIOSession.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Sun Sep 16 07:16:16 2007
@@ -74,7 +74,7 @@
 
     private final Queue<CancellationRequest> cancelQueue = new ConcurrentLinkedQueue<CancellationRequest>();
 
-    private final SocketIoProcessor[] ioProcessors;
+    private final NIOProcessor[] ioProcessors;
 
     private final int processorCount;
 
@@ -145,12 +145,12 @@
         // Set other properties and initialize
         this.executor = executor;
         this.processorCount = processorCount;
-        ioProcessors = new SocketIoProcessor[processorCount];
+        ioProcessors = new NIOProcessor[processorCount];
 
         // create an array of SocketIoProcessors that will be used for
         // handling sessions.
         for (int i = 0; i < processorCount; i++) {
-            ioProcessors[i] = new SocketIoProcessor(
+            ioProcessors[i] = new NIOProcessor(
                     "SocketAcceptorIoProcessor-" + id + "." + i, executor);
         }
     }
@@ -433,7 +433,7 @@
         }
     }
 
-    private SocketIoProcessor nextProcessor() {
+    private NIOProcessor nextProcessor() {
         if (this.processorDistributor == Integer.MAX_VALUE) {
             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Sun Sep 16 07:16:16 2007
@@ -62,7 +62,7 @@
 
     private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();
 
-    private final SocketIoProcessor[] ioProcessors;
+    private final NIOProcessor[] ioProcessors;
 
     private final int processorCount;
 
@@ -104,10 +104,10 @@
 
         this.executor = executor;
         this.processorCount = processorCount;
-        ioProcessors = new SocketIoProcessor[processorCount];
+        ioProcessors = new NIOProcessor[processorCount];
 
         for (int i = 0; i < processorCount; i++) {
-            ioProcessors[i] = new SocketIoProcessor(
+            ioProcessors[i] = new NIOProcessor(
                     "SocketConnectorIoProcessor-" + id + "." + i, executor);
         }
     }
@@ -295,7 +295,7 @@
         session.getProcessor().add(session);
     }
 
-    private SocketIoProcessor nextProcessor() {
+    private NIOProcessor nextProcessor() {
         if (this.processorDistributor == Integer.MAX_VALUE) {
             this.processorDistributor = Integer.MAX_VALUE % this.processorCount;
         }

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?rev=576109&r1=576108&r2=576109&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Sun Sep 16 07:16:16 2007
@@ -24,7 +24,6 @@
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
-import org.apache.mina.common.AbstractIoSession;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.DefaultIoFilterChain;
 import org.apache.mina.common.DefaultTransportMetadata;
@@ -43,7 +42,7 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev$, $Date$
  */
-class SocketSessionImpl extends AbstractIoSession implements SocketSession {
+class SocketSessionImpl extends NIOSession implements SocketSession {
 
     static final TransportMetadata METADATA =
             new DefaultTransportMetadata(