You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by tr...@apache.org on 2007/11/20 14:18:08 UTC

svn commit: r596653 - in /mina/trunk/core/src/main/java/org/apache/mina: common/AbstractPollingIoAcceptor.java transport/socket/nio/NioSocketAcceptor.java

Author: trustin
Date: Tue Nov 20 05:18:08 2007
New Revision: 596653

URL: http://svn.apache.org/viewvc?rev=596653&view=rev
Log:
Implemented an abstraction layer for polling acceptors

Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java   (with props)
Modified:
    mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java

Added: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java?rev=596653&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java Tue Nov 20 05:18:08 2007
@@ -0,0 +1,375 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.net.SocketAddress;
+import java.nio.channels.ClosedSelectorException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.mina.util.NamePreservingRunnable;
+import org.apache.mina.util.NewThreadExecutor;
+
+/**
+ * @author The Apache MINA Project (dev@mina.apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H>
+        extends AbstractIoAcceptor {
+
+    private static final AtomicInteger id = new AtomicInteger();
+
+    private final Executor executor;
+    private final String threadName;
+    private final IoProcessor<T> processor;
+    private final boolean createdProcessor;
+
+    private final Object lock = new Object();
+
+    private final Queue<ServiceOperationFuture> registerQueue =
+        new ConcurrentLinkedQueue<ServiceOperationFuture>();
+    private final Queue<ServiceOperationFuture> cancelQueue =
+        new ConcurrentLinkedQueue<ServiceOperationFuture>();
+
+    private final Map<SocketAddress, H> boundHandles =
+        Collections.synchronizedMap(new HashMap<SocketAddress, H>());
+    private Worker worker;
+
+    /**
+     * Create an acceptor with a single processing thread using a NewThreadExecutor
+     */
+    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass) {
+        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass), true);
+    }
+
+    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class<? extends IoProcessor<T>> processorClass, int processorCount) {
+        this(sessionConfig, null, new SimpleIoProcessorPool<T>(processorClass, processorCount), true);
+    }
+    
+    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, IoProcessor<T> processor) {
+        this(sessionConfig, null, processor, false);
+    }
+
+    protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor) {
+        this(sessionConfig, executor, processor, false);
+    }
+
+    private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Executor executor, IoProcessor<T> processor, boolean createdProcessor) {
+        super(sessionConfig);
+        
+        if (executor == null) {
+            executor = new NewThreadExecutor();
+        }
+        if (processor == null) {
+            throw new NullPointerException("processor");
+        }
+        
+        this.executor = executor;
+        this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
+        this.processor = processor;
+        this.createdProcessor = createdProcessor;
+        
+        doInit();
+    }
+
+    protected abstract void doInit();
+    protected abstract void doDispose0();
+    protected abstract boolean selectable();
+    protected abstract boolean select() throws Exception;
+    protected abstract void wakeup();
+    protected abstract Iterator<H> selectedHandles();
+    protected abstract H bind(SocketAddress localAddress) throws Exception;
+    protected abstract SocketAddress localAddress(H handle) throws Exception;
+    protected abstract T accept(IoProcessor<T> processor, H handle) throws Exception;
+    protected abstract void unbind(H handle) throws Exception;
+
+    @Override
+    protected void doBind() throws Exception {
+        ServiceOperationFuture request = new ServiceOperationFuture();
+
+        // adds the Registration request to the queue for the Workers
+        // to handle
+        registerQueue.add(request);
+
+        // creates an instance of a Worker and has the local
+        // executor kick it off.
+        startupWorker();
+        wakeup();
+        request.awaitUninterruptibly();
+
+        if (request.getException() != null) {
+            throw request.getException();
+        }
+
+        // Update the local addresses.
+        // setLocalAddresses() shouldn't be called from the worker thread
+        // because of deadlock.
+        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
+        for (H handle: boundHandles.values()) {
+            newLocalAddresses.add(localAddress(handle));
+        }
+        setLocalAddresses(newLocalAddresses);
+    }
+
+    /**
+     * This method is called by the doBind() and doUnbind()
+     * methods.  If the worker object is not null, presumably
+     * the acceptor is starting up, then the worker object will
+     * be created and kicked off by the executor.  If the worker
+     * object is not null, probably already created and this class
+     * is now working, then nothing will happen and the method
+     * will just return.
+     */
+    private void startupWorker() {
+        if (!selectable()) {
+            registerQueue.clear();
+            cancelQueue.clear();
+            throw new ClosedSelectorException();
+        }
+        synchronized (lock) {
+            if (worker == null) {
+                worker = new Worker();
+
+                executor.execute(new NamePreservingRunnable(worker, threadName));
+            }
+        }
+    }
+
+    @Override
+    protected void doUnbind() throws Exception {
+        ServiceOperationFuture future = new ServiceOperationFuture();
+
+        cancelQueue.add(future);
+        startupWorker();
+        wakeup();
+
+        future.awaitUninterruptibly();
+        if (future.getException() != null) {
+            throw future.getException();
+        }
+    }
+
+    /**
+     * This class is called by the startupWorker() method and is
+     * placed into a NamePreservingRunnable class.
+     */
+    private class Worker implements Runnable {
+        public void run() {
+            int nHandles = 0;
+            
+            for (;;) {
+                try {
+                    // gets the number of keys that are ready to go
+                    boolean selected = select();
+
+                    // this actually sets the selector to OP_ACCEPT,
+                    // and binds to the port in which this class will
+                    // listen on
+                    nHandles += registerHandles();
+
+                    if (selected) {
+                        processHandles(selectedHandles());
+                    }
+
+                    // check to see if any cancellation request has been made.
+                    nHandles -= unregisterHandles();
+
+                    if (nHandles == 0) {
+                        synchronized (lock) {
+                            if (registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty()) {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                } catch (Throwable e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e1) {
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
+                    }
+                }
+            }
+            
+            if (isDisposed()) {
+                try {
+                    if (createdProcessor) {
+                        processor.dispose();
+                    }
+                } finally {
+                    doDispose0();
+                }
+            }
+        }
+
+        /**
+         * This method will process new sessions for the Worker class.  All
+         * keys that have had their status updates as per the Selector.selectedKeys()
+         * method will be processed here.  Only keys that are ready to accept
+         * connections are handled here.
+         * <p/>
+         * Session objects are created by making new instances of SocketSessionImpl
+         * and passing the session object to the SocketIoProcessor class.
+         */
+        @SuppressWarnings("unchecked")
+        private void processHandles(Iterator<H> handles) {
+            while (handles.hasNext()) {
+                H handle = handles.next();
+                handles.remove();
+
+                boolean success = false;
+                try {
+                    T session = accept(processor, handle);
+                    if (session == null) {
+                        break;
+                    }
+                    
+                    finishSessionInitialization(session, null);
+
+                    // add the session to the SocketIoProcessor
+                    session.getProcessor().add(session);
+                    success = true;
+                } catch (Throwable t) {
+                    ExceptionMonitor.getInstance().exceptionCaught(t);
+                } finally {
+                    if (!success) {
+                        try {
+                            unbind(handle);
+                        } catch (Throwable t) {
+                            ExceptionMonitor.getInstance().exceptionCaught(t);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Sets up the socket communications.  Sets items such as:
+     * <p/>
+     * Blocking
+     * Reuse address
+     * Receive buffer size
+     * Bind to listen port
+     * Registers OP_ACCEPT for selector
+     */
+    private int registerHandles() {
+        for (;;) {
+            ServiceOperationFuture future = registerQueue.poll();
+            if (future == null) {
+                break;
+            }
+            
+            Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
+            List<SocketAddress> localAddresses = getLocalAddresses();
+            
+            try {
+                for (SocketAddress a: localAddresses) {
+                    H handle = null;
+                    boolean success = false;
+                    try {
+                        handle = bind(a);
+                        success = true;
+                    } finally {
+                        if (!success && handle != null) {
+                            try {
+                                unbind(handle);
+                            } catch (Exception e) {
+                                ExceptionMonitor.getInstance().exceptionCaught(e);
+                            }
+                        }
+                    }
+                    
+                    newHandles.put(localAddress(handle), handle);
+                }
+                
+                boundHandles.putAll(newHandles);
+
+                // and notify.
+                future.setDone();
+            } catch (Exception e) {
+                future.setException(e);
+            } finally {
+                // Roll back if failed to bind all addresses.
+                if (future.getException() != null) {
+                    for (H handle: newHandles.values()) {
+                        try {
+                            unbind(handle);
+                        } catch (Exception e) {
+                            ExceptionMonitor.getInstance().exceptionCaught(e);
+                        }
+                    }
+                    wakeup();
+                }
+            }
+        }
+        
+        return boundHandles.size();
+    }
+
+    /**
+     * This method just checks to see if anything has been placed into the
+     * cancellation queue.  The only thing that should be in the cancelQueue
+     * is CancellationRequest objects and the only place this happens is in
+     * the doUnbind() method.
+     */
+    private int unregisterHandles() {
+        int cancelledHandles = 0;
+        for (; ;) {
+            ServiceOperationFuture future = cancelQueue.poll();
+            if (future == null) {
+                break;
+            }
+
+            // close the channels
+            for (H handle: boundHandles.values()) {
+                try {
+                    unbind(handle);
+                    wakeup(); // wake up again to trigger thread death
+                } catch (Exception e) {
+                    ExceptionMonitor.getInstance().exceptionCaught(e);
+                }
+                
+                cancelledHandles ++;
+            }
+            
+            boundHandles.clear();
+            future.setDone();
+        }
+        
+        return cancelledHandles;
+    }
+
+    public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+        throw new UnsupportedOperationException();
+    }
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/AbstractPollingIoAcceptor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java
URL: http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java?rev=596653&r1=596652&r2=596653&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketAcceptor.java Tue Nov 20 05:18:08 2007
@@ -23,36 +23,24 @@
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
-import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
+import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.mina.common.AbstractIoAcceptor;
+import org.apache.mina.common.AbstractPollingIoAcceptor;
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoProcessor;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.RuntimeIoException;
-import org.apache.mina.common.SimpleIoProcessorPool;
 import org.apache.mina.common.TransportMetadata;
 import org.apache.mina.transport.socket.DefaultSocketSessionConfig;
 import org.apache.mina.transport.socket.SocketAcceptor;
 import org.apache.mina.transport.socket.SocketSessionConfig;
-import org.apache.mina.util.NamePreservingRunnable;
-import org.apache.mina.util.NewThreadExecutor;
 
 /**
  * {@link IoAcceptor} for socket transport (TCP/IP).  This class
@@ -61,64 +49,36 @@
  * @author The Apache MINA Project (dev@mina.apache.org)
  * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
  */
-public class NioSocketAcceptor extends AbstractIoAcceptor implements SocketAcceptor {
-
-    private static final AtomicInteger id = new AtomicInteger();
+public class NioSocketAcceptor
+        extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel> 
+        implements SocketAcceptor {
 
     private int backlog = 50;
     private boolean reuseAddress = true;
 
-    private final Executor executor;
-    private final String threadName;
-    private final IoProcessor<NioSession> processor;
-    private final boolean createdProcessor;
-
-    private final Object lock = new Object();
-
-    private final Queue<ServiceOperationFuture> registerQueue =
-        new ConcurrentLinkedQueue<ServiceOperationFuture>();
-    private final Queue<ServiceOperationFuture> cancelQueue =
-        new ConcurrentLinkedQueue<ServiceOperationFuture>();
-
-    private final Map<SocketAddress, ServerSocketChannel> serverChannels =
-        Collections.synchronizedMap(new HashMap<SocketAddress, ServerSocketChannel>());
-    private final Selector selector;
-    private Worker worker;
+    private volatile Selector selector;
 
     /**
      * Create an acceptor with a single processing thread using a NewThreadExecutor
      */
     public NioSocketAcceptor() {
-        this(null, new SimpleIoProcessorPool<NioSession>(NioProcessor.class), true);
+        super(new DefaultSocketSessionConfig(), NioProcessor.class);
     }
 
     public NioSocketAcceptor(int processorCount) {
-        this(null, new SimpleIoProcessorPool<NioSession>(NioProcessor.class, processorCount), true);
+        super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
     }
     
     public NioSocketAcceptor(IoProcessor<NioSession> processor) {
-        this(null, processor, false);
+        super(new DefaultSocketSessionConfig(), processor);
     }
 
     public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
-        this(executor, processor, false);
+        super(new DefaultSocketSessionConfig(), executor, processor);
     }
 
-    private NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor, boolean createdProcessor) {
-        super(new DefaultSocketSessionConfig());
-        
-        if (executor == null) {
-            executor = new NewThreadExecutor();
-        }
-        if (processor == null) {
-            throw new NullPointerException("processor");
-        }
-        
-        this.executor = executor;
-        this.threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();
-        this.processor = processor;
-        this.createdProcessor = createdProcessor;
-        
+    @Override
+    protected void doInit() {
         // The default reuseAddress of an accepted socket should be 'true'.
         getSessionConfig().setReuseAddress(true);
 
@@ -143,23 +103,18 @@
         try {
             this.selector = Selector.open();
         } catch (IOException e) {
-            disposeNow();
+            doDispose0();
             throw new RuntimeIoException("Failed to open a selector.", e);
         }
     }
     
-    private void disposeNow() {
-        try {
-            if (createdProcessor) {
-                processor.dispose();
-            }
-        } finally {
-            if (selector != null) {
-                try {
-                    selector.close();
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                }
+    @Override
+    protected void doDispose0() {
+        if (selector != null) {
+            try {
+                selector.close();
+            } catch (IOException e) {
+                ExceptionMonitor.getInstance().exceptionCaught(e);
             }
         }
     }
@@ -212,278 +167,99 @@
         }
     }
 
+    
     @Override
-    protected void doBind() throws Exception {
-        ServiceOperationFuture request = new ServiceOperationFuture();
-
-        // adds the Registration request to the queue for the Workers
-        // to handle
-        registerQueue.add(request);
-
-        // creates an instance of a Worker and has the local
-        // executor kick it off.
-        startupWorker();
+    protected NioSession accept(IoProcessor<NioSession> processor,
+            ServerSocketChannel handle) throws Exception {
 
-        selector.wakeup();
-
-        request.awaitUninterruptibly();
-
-        if (request.getException() != null) {
-            throw request.getException();
+        SelectionKey key = handle.keyFor(selector);
+        if (!key.isAcceptable()) {
+            return null;
         }
 
-        // Update the local addresses.
-        // setLocalAddresses() shouldn't be called from the worker thread
-        // because of deadlock.
-        Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();
-        for (ServerSocketChannel c: serverChannels.values()) {
-            newLocalAddresses.add(c.socket().getLocalSocketAddress());
+        // accept the connection from the client
+        SocketChannel ch = handle.accept();
+        if (ch == null) {
+            return null;
         }
-        setLocalAddresses(newLocalAddresses);
-    }
-
-    /**
-     * This method is called by the doBind() and doUnbind()
-     * methods.  If the worker object is not null, presumably
-     * the acceptor is starting up, then the worker object will
-     * be created and kicked off by the executor.  If the worker
-     * object is not null, probably already created and this class
-     * is now working, then nothing will happen and the method
-     * will just return.
-     */
-    private void startupWorker() {
-        if (!selector.isOpen()) {
-            registerQueue.clear();
-            cancelQueue.clear();
-            throw new ClosedSelectorException();
-        }
-        synchronized (lock) {
-            if (worker == null) {
-                worker = new Worker();
 
-                executor.execute(new NamePreservingRunnable(worker, threadName));
-            }
-        }
+        return new NioSocketSession(this, processor, ch);
     }
 
     @Override
-    protected void doUnbind() throws Exception {
-        ServiceOperationFuture future = new ServiceOperationFuture();
-
-        cancelQueue.add(future);
-        startupWorker();
-        selector.wakeup();
-
-        future.awaitUninterruptibly();
-        if (future.getException() != null) {
-            throw future.getException();
-        }
+    protected ServerSocketChannel bind(SocketAddress localAddress)
+            throws Exception {
+        ServerSocketChannel c = ServerSocketChannel.open();
+        c.configureBlocking(false);
+        // Configure the server socket,
+        c.socket().setReuseAddress(isReuseAddress());
+        c.socket().setReceiveBufferSize(
+                getSessionConfig().getReceiveBufferSize());
+        // and bind.
+        c.socket().bind(localAddress, getBacklog());
+        c.register(selector, SelectionKey.OP_ACCEPT);
+        return c;
     }
 
-    /**
-     * This class is called by the startupWorker() method and is
-     * placed into a NamePreservingRunnable class.
-     */
-    private class Worker implements Runnable {
-        public void run() {
-            for (;;) {
-                try {
-                    // gets the number of keys that are ready to go
-                    int nKeys = selector.select();
-
-                    // this actually sets the selector to OP_ACCEPT,
-                    // and binds to the port in which this class will
-                    // listen on
-                    registerNew();
-
-                    if (nKeys > 0) {
-                        processSessions(selector.selectedKeys());
-                    }
-
-                    // check to see if any cancellation request has been made.
-                    cancelKeys();
-
-                    if (selector.keys().isEmpty()) {
-                        synchronized (lock) {
-                            if (selector.keys().isEmpty()
-                                    && registerQueue.isEmpty()
-                                    && cancelQueue.isEmpty()) {
-                                worker = null;
-                                break;
-                            }
-                        }
-                    }
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-
-                    try {
-                        Thread.sleep(1000);
-                    } catch (InterruptedException e1) {
-                        ExceptionMonitor.getInstance().exceptionCaught(e1);
-                    }
-                }
-            }
-            
-            if (isDisposed()) {
-                disposeNow();
-            }
-        }
-
-        /**
-         * This method will process new sessions for the Worker class.  All
-         * keys that have had their status updates as per the Selector.selectedKeys()
-         * method will be processed here.  Only keys that are ready to accept
-         * connections are handled here.
-         * <p/>
-         * Session objects are created by making new instances of SocketSessionImpl
-         * and passing the session object to the SocketIoProcessor class.
-         */
-        private void processSessions(Set<SelectionKey> keys) throws IOException {
-            Iterator<SelectionKey> it = keys.iterator();
-            while (it.hasNext()) {
-                SelectionKey key = it.next();
-
-                it.remove();
-
-                if (!key.isAcceptable()) {
-                    continue;
-                }
+    @Override
+    protected SocketAddress localAddress(ServerSocketChannel handle)
+            throws Exception {
+        return handle.socket().getLocalSocketAddress();
+    }
 
-                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
+    @Override
+    protected boolean select() throws Exception {
+        return selector.select() > 0;
+    }
 
-                // accept the connection from the client
-                SocketChannel ch = ssc.accept();
+    @Override
+    protected boolean selectable() {
+        return selector.isOpen();
+    }
 
-                if (ch == null) {
-                    continue;
-                }
+    @Override
+    protected Iterator<ServerSocketChannel> selectedHandles() {
+        return new ServerSocketChannelIterator(selector.selectedKeys());
+    }
 
-                boolean success = false;
-                try {
-                    // Create a new session object.  This class extends
-                    // BaseIoSession and is custom for socket-based sessions.
-                    NioSocketSession session = new NioSocketSession(
-                            NioSocketAcceptor.this, processor, ch);
-                    
-                    finishSessionInitialization(session, null);
-
-                    // add the session to the SocketIoProcessor
-                    session.getProcessor().add(session);
-                    success = true;
-                } catch (Throwable t) {
-                    ExceptionMonitor.getInstance().exceptionCaught(t);
-                } finally {
-                    if (!success) {
-                        ch.close();
-                    }
-                }
-            }
+    @Override
+    protected void unbind(ServerSocketChannel handle) throws Exception {
+        SelectionKey key = handle.keyFor(selector);
+        if (key != null) {
+            key.cancel();
         }
+        handle.close();
     }
 
-    /**
-     * Sets up the socket communications.  Sets items such as:
-     * <p/>
-     * Blocking
-     * Reuse address
-     * Receive buffer size
-     * Bind to listen port
-     * Registers OP_ACCEPT for selector
-     */
-    private void registerNew() {
-        for (;;) {
-            ServiceOperationFuture future = registerQueue.poll();
-            if (future == null) {
-                break;
-            }
-
-            Map<SocketAddress, ServerSocketChannel> newServerChannels =
-                new HashMap<SocketAddress, ServerSocketChannel>();
-            List<SocketAddress> localAddresses = getLocalAddresses();
-            
-            try {
-                for (SocketAddress a: localAddresses) {
-                    ServerSocketChannel c = null;
-                    boolean success = false;
-                    try {
-                        c = ServerSocketChannel.open();
-                        c.configureBlocking(false);
-                        // Configure the server socket,
-                        c.socket().setReuseAddress(isReuseAddress());
-                        c.socket().setReceiveBufferSize(
-                                getSessionConfig().getReceiveBufferSize());
-                        // and bind.
-                        c.socket().bind(a, getBacklog());
-                        c.register(selector, SelectionKey.OP_ACCEPT, future);
-                        success = true;
-                    } finally {
-                        if (!success && c != null) {
-                            try {
-                                c.close();
-                            } catch (IOException e) {
-                                ExceptionMonitor.getInstance().exceptionCaught(e);
-                            }
-                        }
-                    }
-                    
-                    newServerChannels.put(c.socket().getLocalSocketAddress(), c);
-                }
-                
-                serverChannels.putAll(newServerChannels);
-
-                // and notify.
-                future.setDone();
-            } catch (Exception e) {
-                future.setException(e);
-            } finally {
-                // Roll back if failed to bind all addresses.
-                if (future.getException() != null) {
-                    for (ServerSocketChannel c: newServerChannels.values()) {
-                        c.keyFor(selector).cancel();
-                        try {
-                            c.close();
-                        } catch (IOException e) {
-                            ExceptionMonitor.getInstance().exceptionCaught(e);
-                        }
-                    }
-                    selector.wakeup();
-                }
-            }
-        }
+    @Override
+    protected void wakeup() {
+        selector.wakeup();
     }
 
-    /**
-     * This method just checks to see if anything has been placed into the
-     * cancellation queue.  The only thing that should be in the cancelQueue
-     * is CancellationRequest objects and the only place this happens is in
-     * the doUnbind() method.
-     */
-    private void cancelKeys() {
-        for (; ;) {
-            ServiceOperationFuture future = cancelQueue.poll();
-            if (future == null) {
-                break;
-            }
+    @Override
+    public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
+        throw new UnsupportedOperationException();
+    }
+    
+    private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
+        
+        private final Iterator<SelectionKey> i;
+        
+        private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
+            this.i = selectedKeys.iterator();
+        }
 
-            // close the channels
-            for (ServerSocketChannel c: serverChannels.values()) {
-                try {
-                    SelectionKey key = c.keyFor(selector);
-                    key.cancel();
+        public boolean hasNext() {
+            return i.hasNext();
+        }
 
-                    selector.wakeup(); // wake up again to trigger thread death
-                    c.close();
-                } catch (IOException e) {
-                    ExceptionMonitor.getInstance().exceptionCaught(e);
-                }
-            }
-            
-            serverChannels.clear();
-            future.setDone();
+        public ServerSocketChannel next() {
+            SelectionKey key = i.next();
+            return (ServerSocketChannel) key.channel();
         }
-    }
 
-    public IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress) {
-        throw new UnsupportedOperationException();
+        public void remove() {
+            i.remove();
+        }
     }
 }