You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/12/28 03:49:51 UTC

svn commit: r359355 [2/4] - in /directory/network/trunk: ./ src/java/org/apache/mina/common/ src/java/org/apache/mina/transport/socket/nio/support/ src/java/org/apache/mina/transport/vmpipe/support/ src/test/org/apache/mina/common/ xdocs/

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorDelegate.java Tue Dec 27 18:49:31 2005
@@ -1,675 +1,675 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoConnector;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoConnector} for datagram transport (UDP/IP).
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager
-{
-    private static volatile int nextId = 0;
-
-    private final IoConnector wrapper;
-    private final int id = nextId ++ ;
-    private Selector selector;
-    private boolean broadcast;
-    private boolean reuseAddress;
-    private int receiveBufferSize = -1;
-    private int sendBufferSize = -1;
-    private int trafficClass = -1;
-    private final Queue registerQueue = new Queue();
-    private final Queue cancelQueue = new Queue();
-    private final Queue flushingSessions = new Queue();
-    private final Queue trafficControllingSessions = new Queue();
-    private Worker worker;
-
-    /**
-     * Creates a new instance.
-     */
-    public DatagramConnectorDelegate( IoConnector wrapper )
-    {
-        this.wrapper = wrapper;
-    }
-
-    public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-    {
-        return connect( address, null, handler, filterChainBuilder );
-    }
-
-    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
-                                  IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-    {
-        if( address == null )
-            throw new NullPointerException( "address" );
-        if( handler == null )
-            throw new NullPointerException( "handler" );
-
-        if( !( address instanceof InetSocketAddress ) )
-            throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
-        
-        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
-        {
-            throw new IllegalArgumentException( "Unexpected local address type: "
-                                                + localAddress.getClass() );
-        }
-        
-        if( filterChainBuilder == null )
-        {
-            filterChainBuilder = IoFilterChainBuilder.NOOP;
-        }
-        
-        DatagramChannel ch = null;
-        boolean initialized = false;
-        try
-        {
-            ch = DatagramChannel.open();
-            ch.socket().setReuseAddress( reuseAddress );
-            ch.socket().setBroadcast( broadcast );
-            if( receiveBufferSize > 0 )
-            {
-                ch.socket().setReceiveBufferSize( receiveBufferSize );
-            }
-            if( sendBufferSize > 0 )
-            {
-                ch.socket().setSendBufferSize( sendBufferSize );
-            }
-            if( trafficClass > 0 )
-            {
-                ch.socket().setTrafficClass( trafficClass );
-            }
-            if( localAddress != null )
-            {
-                ch.socket().bind( localAddress );
-            }
-            ch.connect( address );
-            ch.configureBlocking( false );
-            initialized = true;
-        }
-        catch( IOException e )
-        {
-            return ConnectFuture.newFailedFuture( e );
-        }
-        finally
-        {
-            if( !initialized && ch != null )
-            {
-                try
-                {
-                    ch.disconnect();
-                    ch.close();
-                }
-                catch( IOException e )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( e );
-                }
-            }
-        }
-
-        RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder );
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                try
-                {
-                    ch.disconnect();
-                    ch.close();
-                }
-                catch( IOException e2 )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( e2 );
-                }
-
-                return ConnectFuture.newFailedFuture( e );
-            }
-            
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
-        }
-
-        selector.wakeup();
-        return request;
-    }
-    
-    public boolean getBroadcast()
-    {
-        return broadcast;
-    }
-    
-    public void setBroadcast( boolean broadcast )
-    {
-        this.broadcast = broadcast;
-    }
-    
-    public boolean getReuseAddress()
-    {
-        return reuseAddress;
-    }
-    
-    public void setReuseAddress( boolean reuseAddress )
-    {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public int getReceiveBufferSize()
-    {
-        return receiveBufferSize;
-    }
-
-    public void setReceiveBufferSize( int receiveBufferSize )
-    {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public int getSendBufferSize()
-    {
-        return sendBufferSize;
-    }
-
-    public void setSendBufferSize( int sendBufferSize )
-    {
-        this.sendBufferSize = sendBufferSize;
-    }
-
-    public int getTrafficClass()
-    {
-        return trafficClass;
-    }
-
-    public void setTrafficClass( int trafficClass )
-    {
-        this.trafficClass = trafficClass;
-    }
-
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
-        }
-    }
-
-    public void closeSession( DatagramSessionImpl session )
-    {
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply return
-                // silently here because it we can simply conclude that
-                // this session is not managed by this connector or
-                // already closed.
-                return;
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( session );
-            }
-        }
-
-        selector.wakeup();
-    }
-
-    public void flushSession( DatagramSessionImpl session )
-    {
-        scheduleFlush( session );
-        Selector selector = this.selector;
-        if( selector != null )
-        {
-            selector.wakeup();
-        }
-    }
-
-    private void scheduleFlush( DatagramSessionImpl session )
-    {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
-    }
-
-    public void updateTrafficMask( DatagramSessionImpl session )
-    {
-        scheduleTrafficControl( session );
-        Selector selector = this.selector;
-        if( selector != null )
-        {
-            selector.wakeup();
-        }
-        selector.wakeup();
-    }
-    
-    private void scheduleTrafficControl( DatagramSessionImpl session )
-    {
-        synchronized( trafficControllingSessions )
-        {
-            trafficControllingSessions.push( session );
-        }
-    }
-    
-    private void doUpdateTrafficMask() 
-    {
-        if( trafficControllingSessions.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            DatagramSessionImpl session;
-
-            synchronized( trafficControllingSessions )
-            {
-                session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            SelectionKey key = session.getSelectionKey();
-            // Retry later if session is not yet fully initialized.
-            // (In case that Session.suspend??() or session.resume??() is 
-            // called before addSession() is processed)
-            if( key == null )
-            {
-                scheduleTrafficControl( session );
-                break;
-            }
-            // skip if channel is already closed
-            if( !key.isValid() )
-            {
-                continue;
-            }
-
-            // The normal is OP_READ and, if there are write requests in the
-            // session's write queue, set OP_WRITE to trigger flushing.
-            int ops = SelectionKey.OP_READ;
-            Queue writeRequestQueue = session.getWriteRequestQueue();
-            synchronized( writeRequestQueue )
-            {
-                if( !writeRequestQueue.isEmpty() )
-                {
-                    ops |= SelectionKey.OP_WRITE;
-                }
-            }
-
-            // Now mask the preferred ops with the mask of the current session
-            int mask = session.getTrafficMask().getInterestOps();
-            key.interestOps( ops & mask );
-        }
-    }
-    
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( "DatagramConnector-" + id );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select();
-
-                    registerNew();
-                    doUpdateTrafficMask();
-
-                    if( nKeys > 0 )
-                    {
-                        processReadySessions( selector.selectedKeys() );
-                    }
-
-                    flushSessions();
-                    cancelKeys();
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( DatagramConnectorDelegate.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                registerQueue.isEmpty() &&
-                                cancelQueue.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    ExceptionMonitor.getInstance().exceptionCaught( e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( IOException e )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught(  e );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-    }
-
-    private void processReadySessions( Set keys )
-    {
-        Iterator it = keys.iterator();
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-            it.remove();
-
-            DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
-
-            if( key.isReadable() && session.getTrafficMask().isReadable() )
-            {
-                readSession( session );
-            }
-
-            if( key.isWritable() && session.getTrafficMask().isWritable() )
-            {
-                scheduleFlush( session );
-            }
-        }
-    }
-
-    private void readSession( DatagramSessionImpl session )
-    {
-
-        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
-        try
-        {
-            int readBytes = session.getChannel().read( readBuf.buf() );
-            if( readBytes > 0 )
-            {
-                readBuf.flip();
-                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
-                newBuf.put( readBuf );
-                newBuf.flip();
-
-                session.increaseReadBytes( readBytes );
-                ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
-            }
-        }
-        catch( IOException e )
-        {
-            ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-        }
-        finally
-        {
-            readBuf.release();
-        }
-    }
-
-    private void flushSessions()
-    {
-        if( flushingSessions.size() == 0 )
-            return;
-
-        for( ;; )
-        {
-            DatagramSessionImpl session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( DatagramSessionImpl ) flushingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            try
-            {
-                flush( session );
-            }
-            catch( IOException e )
-            {
-                ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
-            }
-        }
-    }
-
-    private void flush( DatagramSessionImpl session ) throws IOException
-    {
-        DatagramChannel ch = session.getChannel();
-
-        Queue writeRequestQueue = session.getWriteRequestQueue();
-
-        WriteRequest req;
-        for( ;; )
-        {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
-
-            if( req == null )
-                break;
-
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
-            {
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
-                continue;
-            }
-
-            SelectionKey key = session.getSelectionKey();
-            if( key == null )
-            {
-                scheduleFlush( session );
-                break;
-            }
-            if( !key.isValid() )
-            {
-                continue;
-            }
-
-            int pos = buf.position();
-            int writtenBytes = ch.write( buf.buf() );
-
-            if( writtenBytes == 0 )
-            {
-                // Kernel buffer is full
-                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
-            }
-            else if( writtenBytes > 0 )
-            {
-                key.interestOps( key.interestOps()
-                                 & ( ~SelectionKey.OP_WRITE ) );
-
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                session.increaseWrittenBytes( writtenBytes );
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
-            }
-        }
-    }
-
-    private void registerNew()
-    {
-        if( registerQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
-
-            if( req == null )
-                break;
-
-            DatagramSessionImpl session =
-                new DatagramSessionImpl( wrapper, this, req.channel, req.handler );
-
-            boolean success = false;
-            try
-            {
-                this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
-                req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
-                ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
-
-                SelectionKey key = req.channel.register( selector,
-                        SelectionKey.OP_READ, session );
-    
-                session.setSelectionKey( key );
-
-                req.setSession( session );
-                success = true;
-            }
-            catch( Throwable t )
-            {
-                req.setException( t );
-            }
-            finally 
-            {
-                if( !success )
-                {
-                    try
-                    {
-                        req.channel.disconnect();
-                        req.channel.close();
-                    }
-                    catch (IOException e)
-                    {
-                        ExceptionMonitor.getInstance().exceptionCaught( e );
-                    }
-                }
-            }
-        }
-    }
-
-    private void cancelKeys()
-    {
-        if( cancelQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            DatagramSessionImpl session;
-            synchronized( cancelQueue )
-            {
-                session = ( DatagramSessionImpl ) cancelQueue.pop();
-            }
-
-            if( session == null )
-                break;
-            else
-            {
-                SelectionKey key = session.getSelectionKey();
-                DatagramChannel ch = ( DatagramChannel ) key.channel();
-                try
-                {
-                    ch.disconnect();
-                    ch.close();
-                }
-                catch( IOException e )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( e );
-                }
-                session.getCloseFuture().setClosed();
-                key.cancel();
-                selector.wakeup(); // wake up again to trigger thread death
-            }
-        }
-    }
-
-    private static class RegistrationRequest extends ConnectFuture
-    {
-        private final DatagramChannel channel;
-        private final IoHandler handler;
-        private final IoFilterChainBuilder filterChainBuilder;
-
-        private RegistrationRequest( DatagramChannel channel,
-                                     IoHandler handler,
-                                     IoFilterChainBuilder filterChainBuilder )
-        {
-            this.channel = channel;
-            this.handler = handler;
-            this.filterChainBuilder = filterChainBuilder;
-        }
-    }
-}
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramConnectorDelegate extends BaseIoConnector implements DatagramSessionManager
+{
+    private static volatile int nextId = 0;
+
+    private final IoConnector wrapper;
+    private final int id = nextId ++ ;
+    private Selector selector;
+    private boolean broadcast;
+    private boolean reuseAddress;
+    private int receiveBufferSize = -1;
+    private int sendBufferSize = -1;
+    private int trafficClass = -1;
+    private final Queue registerQueue = new Queue();
+    private final Queue cancelQueue = new Queue();
+    private final Queue flushingSessions = new Queue();
+    private final Queue trafficControllingSessions = new Queue();
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     */
+    public DatagramConnectorDelegate( IoConnector wrapper )
+    {
+        this.wrapper = wrapper;
+    }
+
+    public ConnectFuture connect( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+    {
+        return connect( address, null, handler, filterChainBuilder );
+    }
+
+    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+                                  IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( !( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+        
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+        }
+        
+        if( filterChainBuilder == null )
+        {
+            filterChainBuilder = IoFilterChainBuilder.NOOP;
+        }
+        
+        DatagramChannel ch = null;
+        boolean initialized = false;
+        try
+        {
+            ch = DatagramChannel.open();
+            ch.socket().setReuseAddress( reuseAddress );
+            ch.socket().setBroadcast( broadcast );
+            if( receiveBufferSize > 0 )
+            {
+                ch.socket().setReceiveBufferSize( receiveBufferSize );
+            }
+            if( sendBufferSize > 0 )
+            {
+                ch.socket().setSendBufferSize( sendBufferSize );
+            }
+            if( trafficClass > 0 )
+            {
+                ch.socket().setTrafficClass( trafficClass );
+            }
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+            ch.connect( address );
+            ch.configureBlocking( false );
+            initialized = true;
+        }
+        catch( IOException e )
+        {
+            return ConnectFuture.newFailedFuture( e );
+        }
+        finally
+        {
+            if( !initialized && ch != null )
+            {
+                try
+                {
+                    ch.disconnect();
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        RegistrationRequest request = new RegistrationRequest( ch, handler, filterChainBuilder );
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                try
+                {
+                    ch.disconnect();
+                    ch.close();
+                }
+                catch( IOException e2 )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e2 );
+                }
+
+                return ConnectFuture.newFailedFuture( e );
+            }
+            
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+        }
+
+        selector.wakeup();
+        return request;
+    }
+    
+    public boolean getBroadcast()
+    {
+        return broadcast;
+    }
+    
+    public void setBroadcast( boolean broadcast )
+    {
+        this.broadcast = broadcast;
+    }
+    
+    public boolean getReuseAddress()
+    {
+        return reuseAddress;
+    }
+    
+    public void setReuseAddress( boolean reuseAddress )
+    {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public int getReceiveBufferSize()
+    {
+        return receiveBufferSize;
+    }
+
+    public void setReceiveBufferSize( int receiveBufferSize )
+    {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public int getSendBufferSize()
+    {
+        return sendBufferSize;
+    }
+
+    public void setSendBufferSize( int sendBufferSize )
+    {
+        this.sendBufferSize = sendBufferSize;
+    }
+
+    public int getTrafficClass()
+    {
+        return trafficClass;
+    }
+
+    public void setTrafficClass( int trafficClass )
+    {
+        this.trafficClass = trafficClass;
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    public void closeSession( DatagramSessionImpl session )
+    {
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                // IOException is thrown only when Worker thread is not
+                // running and failed to open a selector.  We simply return
+                // silently here because it we can simply conclude that
+                // this session is not managed by this connector or
+                // already closed.
+                return;
+            }
+
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( session );
+            }
+        }
+
+        selector.wakeup();
+    }
+
+    public void flushSession( DatagramSessionImpl session )
+    {
+        scheduleFlush( session );
+        Selector selector = this.selector;
+        if( selector != null )
+        {
+            selector.wakeup();
+        }
+    }
+
+    private void scheduleFlush( DatagramSessionImpl session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    public void updateTrafficMask( DatagramSessionImpl session )
+    {
+        scheduleTrafficControl( session );
+        Selector selector = this.selector;
+        if( selector != null )
+        {
+            selector.wakeup();
+        }
+        selector.wakeup();
+    }
+    
+    private void scheduleTrafficControl( DatagramSessionImpl session )
+    {
+        synchronized( trafficControllingSessions )
+        {
+            trafficControllingSessions.push( session );
+        }
+    }
+    
+    private void doUpdateTrafficMask() 
+    {
+        if( trafficControllingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            DatagramSessionImpl session;
+
+            synchronized( trafficControllingSessions )
+            {
+                session = ( DatagramSessionImpl ) trafficControllingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SelectionKey key = session.getSelectionKey();
+            // Retry later if session is not yet fully initialized.
+            // (In case that Session.suspend??() or session.resume??() is 
+            // called before addSession() is processed)
+            if( key == null )
+            {
+                scheduleTrafficControl( session );
+                break;
+            }
+            // skip if channel is already closed
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            // The normal is OP_READ and, if there are write requests in the
+            // session's write queue, set OP_WRITE to trigger flushing.
+            int ops = SelectionKey.OP_READ;
+            Queue writeRequestQueue = session.getWriteRequestQueue();
+            synchronized( writeRequestQueue )
+            {
+                if( !writeRequestQueue.isEmpty() )
+                {
+                    ops |= SelectionKey.OP_WRITE;
+                }
+            }
+
+            // Now mask the preferred ops with the mask of the current session
+            int mask = session.getTrafficMask().getInterestOps();
+            key.interestOps( ops & mask );
+        }
+    }
+    
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramConnector-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+                    doUpdateTrafficMask();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramConnectorDelegate.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught(  e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramSessionImpl session = ( DatagramSessionImpl ) key.attachment();
+
+            if( key.isReadable() && session.getTrafficMask().isReadable() )
+            {
+                readSession( session );
+            }
+
+            if( key.isWritable() && session.getTrafficMask().isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+    }
+
+    private void readSession( DatagramSessionImpl session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+        try
+        {
+            int readBytes = session.getChannel().read( readBuf.buf() );
+            if( readBytes > 0 )
+            {
+                readBuf.flip();
+                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+                newBuf.put( readBuf );
+                newBuf.flip();
+
+                session.increaseReadBytes( readBytes );
+                ( ( DatagramFilterChain ) session.getFilterChain() ).messageReceived( session, newBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+        }
+        finally
+        {
+            readBuf.release();
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSessionImpl session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSessionImpl ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                ( ( DatagramFilterChain ) session.getFilterChain() ).exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSessionImpl session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for( ;; )
+        {
+            synchronized( writeRequestQueue )
+            {
+                req = ( WriteRequest ) writeRequestQueue.first();
+            }
+
+            if( req == null )
+                break;
+
+            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf );
+                continue;
+            }
+
+            SelectionKey key = session.getSelectionKey();
+            if( key == null )
+            {
+                scheduleFlush( session );
+                break;
+            }
+            if( !key.isValid() )
+            {
+                continue;
+            }
+
+            int pos = buf.position();
+            int writtenBytes = ch.write( buf.buf() );
+
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else if( writtenBytes > 0 )
+            {
+                key.interestOps( key.interestOps()
+                                 & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                session.increaseWrittenBytes( writtenBytes );
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                ( ( DatagramFilterChain ) session.getFilterChain() ).messageSent( session, buf.position( pos ) );
+            }
+        }
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            DatagramSessionImpl session =
+                new DatagramSessionImpl( wrapper, this, req.channel, req.handler );
+
+            boolean success = false;
+            try
+            {
+                this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+                req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+                ( ( DatagramFilterChain ) session.getFilterChain() ).sessionCreated( session );
+
+                SelectionKey key = req.channel.register( selector,
+                        SelectionKey.OP_READ, session );
+    
+                session.setSelectionKey( key );
+
+                req.setSession( session );
+                success = true;
+            }
+            catch( Throwable t )
+            {
+                req.setException( t );
+            }
+            finally 
+            {
+                if( !success )
+                {
+                    try
+                    {
+                        req.channel.disconnect();
+                        req.channel.close();
+                    }
+                    catch (IOException e)
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            DatagramSessionImpl session;
+            synchronized( cancelQueue )
+            {
+                session = ( DatagramSessionImpl ) cancelQueue.pop();
+            }
+
+            if( session == null )
+                break;
+            else
+            {
+                SelectionKey key = session.getSelectionKey();
+                DatagramChannel ch = ( DatagramChannel ) key.channel();
+                try
+                {
+                    ch.disconnect();
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+                session.getCloseFuture().setClosed();
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
+    private static class RegistrationRequest extends ConnectFuture
+    {
+        private final DatagramChannel channel;
+        private final IoHandler handler;
+        private final IoFilterChainBuilder filterChainBuilder;
+
+        private RegistrationRequest( DatagramChannel channel,
+                                     IoHandler handler,
+                                     IoFilterChainBuilder filterChainBuilder )
+        {
+            this.channel = channel;
+            this.handler = handler;
+            this.filterChainBuilder = filterChainBuilder;
+        }
+    }
+}

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramFilterChain.java Tue Dec 27 18:49:31 2005
@@ -1,69 +1,69 @@
-/*
- *   @(#) $Id: DatagramConnectorDelegate.java 351888 2005-12-03 04:39:53Z trustin $
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import org.apache.mina.common.CloseFuture;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.AbstractIoFilterChain;
-import org.apache.mina.util.Queue;
-
-/**
- * An {@link IoFilterChain} for datagram transport (UDP/IP).
- * 
- * @author The Apache Directory Project
- */
-class DatagramFilterChain extends AbstractIoFilterChain {
-
-    DatagramFilterChain( IoSession parent )
-    {
-        super( parent );
-    }
-    
-    protected void doWrite( IoSession session, WriteRequest writeRequest )
-    {
-        DatagramSessionImpl s = ( DatagramSessionImpl ) session;
-        Queue writeRequestQueue = s.getWriteRequestQueue();
-        
-        synchronized( writeRequestQueue )
-        {
-            writeRequestQueue.push( writeRequest );
-            if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
-            {
-                // Notify DatagramSessionManager only when writeRequestQueue was empty.
-                s.getManagerDelegate().flushSession( s );
-            }
-        }
-    }
-
-    protected void doClose( IoSession session, CloseFuture closeFuture )
-    {
-        DatagramSessionImpl s = ( DatagramSessionImpl ) session;
-        DatagramSessionManager manager = s.getManagerDelegate();
-        if( manager instanceof DatagramConnectorDelegate )
-        {
-            ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
-        }
-        else
-        {
-            closeFuture.setClosed();
-        }
-    }
-}
+/*
+ *   @(#) $Id: DatagramConnectorDelegate.java 351888 2005-12-03 04:39:53Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import org.apache.mina.common.CloseFuture;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.AbstractIoFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for datagram transport (UDP/IP).
+ * 
+ * @author The Apache Directory Project
+ */
+class DatagramFilterChain extends AbstractIoFilterChain {
+
+    DatagramFilterChain( IoSession parent )
+    {
+        super( parent );
+    }
+    
+    protected void doWrite( IoSession session, WriteRequest writeRequest )
+    {
+        DatagramSessionImpl s = ( DatagramSessionImpl ) session;
+        Queue writeRequestQueue = s.getWriteRequestQueue();
+        
+        synchronized( writeRequestQueue )
+        {
+            writeRequestQueue.push( writeRequest );
+            if( writeRequestQueue.size() == 1 && session.getTrafficMask().isWritable() )
+            {
+                // Notify DatagramSessionManager only when writeRequestQueue was empty.
+                s.getManagerDelegate().flushSession( s );
+            }
+        }
+    }
+
+    protected void doClose( IoSession session, CloseFuture closeFuture )
+    {
+        DatagramSessionImpl s = ( DatagramSessionImpl ) session;
+        DatagramSessionManager manager = s.getManagerDelegate();
+        if( manager instanceof DatagramConnectorDelegate )
+        {
+            ( ( DatagramConnectorDelegate ) manager ).closeSession( s );
+        }
+        else
+        {
+            closeFuture.setClosed();
+        }
+    }
+}

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java?rev=359355&r1=359354&r2=359355&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java Tue Dec 27 18:49:31 2005
@@ -1,514 +1,514 @@
-/*
- *   @(#) $Id$
- *
- *   Copyright 2004 The Apache Software Foundation
- *
- *   Licensed under the Apache License, Version 2.0 (the "License");
- *   you may not use this file except in compliance with the License.
- *   You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- *
- */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IoAcceptor;
-import org.apache.mina.common.IoFilterChainBuilder;
-import org.apache.mina.common.IoHandler;
-import org.apache.mina.common.support.BaseIoAcceptor;
-import org.apache.mina.transport.socket.nio.SocketSessionManager;
-import org.apache.mina.util.Queue;
-
-/**
- * {@link IoAcceptor} for socket transport (TCP/IP).
- * 
- * @author The Apache Directory Project (dev@directory.apache.org)
- * @version $Rev$, $Date$
- */
-public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager
-{
-    private static volatile int nextId = 0;
-
-    private final IoAcceptor wrapper;
-    private final int id = nextId ++ ;
-    private final String threadName = "SocketAcceptor-" + id;
-    private boolean reuseAddress = false;
-    private int backlog = 50;
-    private int receiveBufferSize = -1;
-    private Selector selector;
-    private final Map channels = new HashMap();
-
-    private final Queue registerQueue = new Queue();
-    private final Queue cancelQueue = new Queue();
-    
-    private Worker worker;
-
-    /**
-     * Creates a new instance.
-     */
-    public SocketAcceptorDelegate( IoAcceptor wrapper )
-    {
-        this.wrapper = wrapper;
-    }
-
-    /**
-     * Binds to the specified <code>address</code> and handles incoming
-     * connections with the specified <code>handler</code>.  Backlog value
-     * is configured to the value of <code>backlog</code> property.
-     *
-     * @throws IOException if failed to bind
-     */
-    public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
-    {
-        if( address == null )
-        {
-            throw new NullPointerException( "address" );
-        }
-
-        if( handler == null )
-        {
-            throw new NullPointerException( "handler" );
-        }
-
-        if( !( address instanceof InetSocketAddress ) )
-        {
-            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
-        }
-
-        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
-        {
-            throw new IllegalArgumentException( "Unsupported port number: 0" );
-        }
-        
-        if( filterChainBuilder == null )
-        {
-            filterChainBuilder = IoFilterChainBuilder.NOOP;
-        }
-        
-        RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
-
-        synchronized( this )
-        {
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
-            startupWorker();
-        }
-        
-        selector.wakeup();
-        
-        synchronized( request )
-        {
-            while( !request.done )
-            {
-                try
-                {
-                    request.wait();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-        }
-        
-        if( request.exception != null )
-        {
-            throw request.exception;
-        }
-    }
-
-
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-
-            worker.start();
-        }
-    }
-
-    public void unbind( SocketAddress address )
-    {
-        // TODO: DIRMINA-93
-        if( address == null )
-        {
-            throw new NullPointerException( "address" );
-        }
-
-        CancellationRequest request = new CancellationRequest( address );
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply throw
-                // IllegalArgumentException here because we can simply
-                // conclude that nothing is bound to the selector.
-                throw new IllegalArgumentException( "Address not bound: " + address );
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( request );
-            }
-        }
-        
-        selector.wakeup();
-
-        synchronized( request )
-        {
-            while( !request.done )
-            {
-                try
-                {
-                    request.wait();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-        }
-        
-        if( request.exception != null )
-        {
-            request.exception.fillInStackTrace();
-
-            throw request.exception;
-        }
-    }
-    
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( SocketAcceptorDelegate.this.threadName );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select();
-
-                    registerNew();
-                    cancelKeys();
-
-                    if( nKeys > 0 )
-                    {
-                        processSessions( selector.selectedKeys() );
-                    }
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( SocketAcceptorDelegate.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                registerQueue.isEmpty() &&
-                                cancelQueue.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    ExceptionMonitor.getInstance().exceptionCaught( e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( IOException e )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( e );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-
-        private void processSessions( Set keys ) throws IOException
-        {
-            Iterator it = keys.iterator();
-            while( it.hasNext() )
-            {
-                SelectionKey key = ( SelectionKey ) it.next();
-   
-                it.remove();
-   
-                if( !key.isAcceptable() )
-                {
-                    continue;
-                }
-   
-                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
-   
-                SocketChannel ch = ssc.accept();
-   
-                if( ch == null )
-                {
-                    continue;
-                }
-   
-                boolean success = false;
-                try
-                {
-                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-                    SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ch, req.handler );
-                    SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
-                    req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
-                    ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
-                    session.getIoProcessor().addNew( session );
-                    success = true;
-                }
-                catch( Throwable t )
-                {
-                    ExceptionMonitor.getInstance().exceptionCaught( t );
-                }
-                finally
-                {
-                    if( !success )
-                    {
-                        ch.close();
-                    }
-                }
-            }
-        }
-    }
-
-
-    private void registerNew()
-    {
-        if( registerQueue.isEmpty() )
-        {
-            return;
-        }
-
-        for( ;; )
-        {
-            RegistrationRequest req;
-
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
-
-            if( req == null )
-            {
-                break;
-            }
-
-            ServerSocketChannel ssc = null;
-
-            try
-            {
-                ssc = ServerSocketChannel.open();
-                ssc.configureBlocking( false );
-                
-                // Configure the server socket,
-                ssc.socket().setReuseAddress( isReuseAddress() );
-                if( getReceiveBufferSize() > 0 )
-                {
-                    ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
-                }
-                
-                // and bind.
-                ssc.socket().bind( req.address, getBacklog() );
-                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
-
-                channels.put( req.address, ssc );
-            }
-            catch( IOException e )
-            {
-                req.exception = e;
-            }
-            finally
-            {
-                synchronized( req )
-                {
-                    req.done = true;
-
-                    req.notify();
-                }
-
-                if( ssc != null && req.exception != null )
-                {
-                    try
-                    {
-                        ssc.close();
-                    }
-                    catch( IOException e )
-                    {
-                        ExceptionMonitor.getInstance().exceptionCaught( e );
-                    }
-                }
-            }
-        }
-    }
-
-
-    private void cancelKeys()
-    {
-        if( cancelQueue.isEmpty() )
-        {
-            return;
-        }
-
-        for( ;; )
-        {
-            CancellationRequest request;
-
-            synchronized( cancelQueue )
-            {
-                request = ( CancellationRequest ) cancelQueue.pop();
-            }
-
-            if( request == null )
-            {
-                break;
-            }
-
-            ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
-            
-            // close the channel
-            try
-            {
-                if( ssc == null )
-                {
-                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
-                }
-                else
-                {
-                    SelectionKey key = ssc.keyFor( selector );
-
-                    key.cancel();
-
-                    selector.wakeup(); // wake up again to trigger thread death
-
-                    ssc.close();
-                }
-            }
-            catch( IOException e )
-            {
-                ExceptionMonitor.getInstance().exceptionCaught( e );
-            }
-            finally
-            {
-                synchronized( request )
-                {
-                    request.done = true;
-
-                    request.notify();
-                }
-            }
-        }
-    }
-
-    public int getReceiveBufferSize()
-    {
-        return receiveBufferSize;
-    }
-
-    /**
-     * @param receiveBufferSize <tt>-1</tt> to use the default value.
-     */
-    public void setReceiveBufferSize( int receiveBufferSize )
-    {
-        this.receiveBufferSize = receiveBufferSize;
-    }
-
-    public boolean isReuseAddress()
-    {
-        return reuseAddress;
-    }
-
-    public void setReuseAddress( boolean reuseAddress )
-    {
-        this.reuseAddress = reuseAddress;
-    }
-
-    public int getBacklog()
-    {
-        return backlog;
-    }
-
-    public void setBacklog( int backlog )
-    {
-        if( backlog <= 0 )
-        {
-            throw new IllegalArgumentException( "backlog: " + backlog );
-        }
-        this.backlog = backlog;
-    }
-
-    private static class RegistrationRequest
-    {
-        private final SocketAddress address;
-        private final IoHandler handler;
-        private final IoFilterChainBuilder filterChainBuilder;
-        private IOException exception;
-        private boolean done;
-        
-        private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
-        {
-            this.address = address;
-            this.handler = handler;
-            this.filterChainBuilder = filterChainBuilder;
-        }
-    }
-
-
-    private static class CancellationRequest
-    {
-        private final SocketAddress address;
-
-        private boolean done;
-
-        private RuntimeException exception;
-        
-        private CancellationRequest( SocketAddress address )
-        {
-            this.address = address;
-        }
-    }
-}
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChainBuilder;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.transport.socket.nio.SocketSessionManager;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketAcceptorDelegate extends BaseIoAcceptor implements SocketSessionManager
+{
+    private static volatile int nextId = 0;
+
+    private final IoAcceptor wrapper;
+    private final int id = nextId ++ ;
+    private final String threadName = "SocketAcceptor-" + id;
+    private boolean reuseAddress = false;
+    private int backlog = 50;
+    private int receiveBufferSize = -1;
+    private Selector selector;
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+    private final Queue cancelQueue = new Queue();
+    
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     */
+    public SocketAcceptorDelegate( IoAcceptor wrapper )
+    {
+        this.wrapper = wrapper;
+    }
+
+    /**
+     * Binds to the specified <code>address</code> and handles incoming
+     * connections with the specified <code>handler</code>.  Backlog value
+     * is configured to the value of <code>backlog</code> property.
+     *
+     * @throws IOException if failed to bind
+     */
+    public void bind( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder ) throws IOException
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        if( handler == null )
+        {
+            throw new NullPointerException( "handler" );
+        }
+
+        if( !( address instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+        }
+
+        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+        {
+            throw new IllegalArgumentException( "Unsupported port number: 0" );
+        }
+        
+        if( filterChainBuilder == null )
+        {
+            filterChainBuilder = IoFilterChainBuilder.NOOP;
+        }
+        
+        RegistrationRequest request = new RegistrationRequest( address, handler, filterChainBuilder );
+
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+        
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            throw request.exception;
+        }
+    }
+
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+
+            worker.start();
+        }
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        // TODO: DIRMINA-93
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        CancellationRequest request = new CancellationRequest( address );
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                // IOException is thrown only when Worker thread is not
+                // running and failed to open a selector.  We simply throw
+                // IllegalArgumentException here because we can simply
+                // conclude that nothing is bound to the selector.
+                throw new IllegalArgumentException( "Address not bound: " + address );
+            }
+
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( request );
+            }
+        }
+        
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+
+            throw request.exception;
+        }
+    }
+    
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( SocketAcceptorDelegate.this.threadName );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+                    cancelKeys();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketAcceptorDelegate.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+
+        private void processSessions( Set keys ) throws IOException
+        {
+            Iterator it = keys.iterator();
+            while( it.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) it.next();
+   
+                it.remove();
+   
+                if( !key.isAcceptable() )
+                {
+                    continue;
+                }
+   
+                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+   
+                SocketChannel ch = ssc.accept();
+   
+                if( ch == null )
+                {
+                    continue;
+                }
+   
+                boolean success = false;
+                try
+                {
+                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+                    SocketSessionImpl session = new SocketSessionImpl( SocketAcceptorDelegate.this.wrapper, ch, req.handler );
+                    SocketAcceptorDelegate.this.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+                    req.filterChainBuilder.buildFilterChain( session.getFilterChain() );
+                    ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+                    session.getIoProcessor().addNew( session );
+                    success = true;
+                }
+                catch( Throwable t )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( t );
+                }
+                finally
+                {
+                    if( !success )
+                    {
+                        ch.close();
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = null;
+
+            try
+            {
+                ssc = ServerSocketChannel.open();
+                ssc.configureBlocking( false );
+                
+                // Configure the server socket,
+                ssc.socket().setReuseAddress( isReuseAddress() );
+                if( getReceiveBufferSize() > 0 )
+                {
+                    ssc.socket().setReceiveBufferSize( getReceiveBufferSize() );
+                }
+                
+                // and bind.
+                ssc.socket().bind( req.address, getBacklog() );
+                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+                channels.put( req.address, ssc );
+            }
+            catch( IOException e )
+            {
+                req.exception = e;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+
+                    req.notify();
+                }
+
+                if( ssc != null && req.exception != null )
+                {
+                    try
+                    {
+                        ssc.close();
+                    }
+                    catch( IOException e )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ;; )
+        {
+            CancellationRequest request;
+
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+
+            if( request == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
+            
+            // close the channel
+            try
+            {
+                if( ssc == null )
+                {
+                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ssc.keyFor( selector );
+
+                    key.cancel();
+
+                    selector.wakeup(); // wake up again to trigger thread death
+
+                    ssc.close();
+                }
+            }
+            catch( IOException e )
+            {
+                ExceptionMonitor.getInstance().exceptionCaught( e );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+
+                    request.notify();
+                }
+            }
+        }
+    }
+
+    public int getReceiveBufferSize()
+    {
+        return receiveBufferSize;
+    }
+
+    /**
+     * @param receiveBufferSize <tt>-1</tt> to use the default value.
+     */
+    public void setReceiveBufferSize( int receiveBufferSize )
+    {
+        this.receiveBufferSize = receiveBufferSize;
+    }
+
+    public boolean isReuseAddress()
+    {
+        return reuseAddress;
+    }
+
+    public void setReuseAddress( boolean reuseAddress )
+    {
+        this.reuseAddress = reuseAddress;
+    }
+
+    public int getBacklog()
+    {
+        return backlog;
+    }
+
+    public void setBacklog( int backlog )
+    {
+        if( backlog <= 0 )
+        {
+            throw new IllegalArgumentException( "backlog: " + backlog );
+        }
+        this.backlog = backlog;
+    }
+
+    private static class RegistrationRequest
+    {
+        private final SocketAddress address;
+        private final IoHandler handler;
+        private final IoFilterChainBuilder filterChainBuilder;
+        private IOException exception;
+        private boolean done;
+        
+        private RegistrationRequest( SocketAddress address, IoHandler handler, IoFilterChainBuilder filterChainBuilder )
+        {
+            this.address = address;
+            this.handler = handler;
+            this.filterChainBuilder = filterChainBuilder;
+        }
+    }
+
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+
+        private boolean done;
+
+        private RuntimeException exception;
+        
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}