You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/05/09 11:21:54 UTC

svn commit: r169259 [4/5] - in /directory/network/branches/api_integration/src: examples/org/apache/mina/examples/echoserver/ examples/org/apache/mina/examples/httpserver/ examples/org/apache/mina/examples/netcat/ examples/org/apache/mina/examples/reverser/ examples/org/apache/mina/examples/tennis/ java/org/apache/mina/common/ java/org/apache/mina/filter/ java/org/apache/mina/filter/codec/ java/org/apache/mina/handler/ java/org/apache/mina/io/ java/org/apache/mina/io/datagram/ java/org/apache/mina/io/filter/ java/org/apache/mina/io/handler/ java/org/apache/mina/io/socket/ java/org/apache/mina/protocol/ java/org/apache/mina/protocol/codec/ java/org/apache/mina/protocol/filter/ java/org/apache/mina/protocol/handler/ java/org/apache/mina/protocol/io/ java/org/apache/mina/protocol/vmpipe/ java/org/apache/mina/registry/ java/org/apache/mina/transport/ java/org/apache/mina/transport/socket/ java/org/apache/mina/transport/socket/bio/ java/org/apache/mina/transport/socket/nio/ java/org/apache/mina/transport/vmpipe/ java/org/apache/mina/util/ test/org/apache/mina/examples/echoserver/ test/org/apache/mina/io/ test/org/apache/mina/io/datagram/ test/org/apache/mina/io/socket/ test/org/apache/mina/protocol/ test/org/apache/mina/protocol/codec/ test/org/apache/mina/util/

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Mon May  9 02:21:50 2005
@@ -0,0 +1,528 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
+{
+    private static volatile int nextId = 0;
+
+    private final IoSessionManagerFilterChain filters =
+        new DatagramSessionManagerFilterChain( this );
+
+    private final int id = nextId ++ ;
+
+    private final Selector selector;
+
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     * 
+     * @throws IOException if failed to open a selector.
+     */
+    public DatagramAcceptor() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    public void bind( SocketAddress address, IoHandler handler )
+            throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( !( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+            throw new IllegalArgumentException( "Unsupported port number: 0" );
+        
+        RegistrationRequest request = new RegistrationRequest( address, handler );
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            ExceptionUtil.throwException( request.exception );
+        }
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+
+        CancellationRequest request = new CancellationRequest( address );
+        synchronized( this )
+        {
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( request );
+            }
+            startupWorker();
+        }
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+            throw request.exception;
+        }
+    }
+    
+    private synchronized void startupWorker()
+    {
+        if( worker == null )
+        {
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    void flushSession( DatagramSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    void closeSession( DatagramSession session )
+    {
+    }
+
+    private void scheduleFlush( DatagramSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramAcceptor-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramAcceptor.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
+                            e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramChannel ch = ( DatagramChannel ) key.channel();
+
+            RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+            DatagramSession session = new DatagramSession(
+                    filters, ch, req.handler );
+            session.setSelectionKey( key );
+            
+            try
+            {
+                req.handler.sessionCreated( session );
+
+                if( key.isReadable() )
+                {
+                    readSession( session );
+                }
+
+                if( key.isWritable() )
+                {
+                    scheduleFlush( session );
+                }
+            }
+            catch( Throwable t )
+            {
+                exceptionMonitor.exceptionCaught( this, t );
+            }
+        }
+    }
+
+    private void readSession( DatagramSession session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+        try
+        {
+            SocketAddress remoteAddress = session.getChannel().receive(
+                    readBuf.buf() );
+            if( remoteAddress != null )
+            {
+                readBuf.flip();
+                session.setRemoteAddress( remoteAddress );
+
+                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+                newBuf.put( readBuf );
+                newBuf.flip();
+
+                session.increaseReadBytes( newBuf.remaining() );
+                filters.messageReceived( session, newBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            filters.exceptionCaught( session, e );
+        }
+        finally
+        {
+            readBuf.release();
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSession session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeBufferQueue = session.getWriteBufferQueue();
+
+        ByteBuffer buf;
+        for( ;; )
+        {
+            synchronized( writeBufferQueue )
+            {
+                buf = ( ByteBuffer ) writeBufferQueue.first();
+            }
+
+            if( buf == null )
+                break;
+
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                }
+
+                // FIXME buffer must be released after messageSent is fired.
+                session.getManagerFilterChain().messageSent( session, buf );
+                continue;
+            }
+
+            int writtenBytes = ch
+                    .send( buf.buf(), session.getRemoteAddress() );
+
+            SelectionKey key = session.getSelectionKey();
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else if( writtenBytes > 0 )
+            {
+                key.interestOps( key.interestOps()
+                                 & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                }
+
+                session.increaseWrittenBytes( writtenBytes );
+                session.getManagerFilterChain().messageSent( session, buf );
+            }
+        }
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            DatagramChannel ch = null;
+            try
+            {
+                ch = DatagramChannel.open();
+                ch.configureBlocking( false );
+                ch.socket().bind( req.address );
+                ch.register( selector, SelectionKey.OP_READ, req );
+                channels.put( req.address, ch );
+            }
+            catch( Throwable t )
+            {
+                req.exception = t;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+                    req.notify();
+                }
+
+                if( ch != null && req.exception != null )
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch( Throwable e )
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            CancellationRequest request;
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+            
+            if( request == null )
+            {
+                break;
+            }
+
+            DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
+            // close the channel
+            try
+            {
+                if( ch == null )
+                {
+                    request.exception = new IllegalArgumentException(
+                            "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ch.keyFor( selector );
+                    key.cancel();
+                    selector.wakeup(); // wake up again to trigger thread death
+                    ch.close();
+                }
+            }
+            catch( Throwable t )
+            {
+                exceptionMonitor.exceptionCaught( this, t );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+                    request.notify();
+                }
+            }
+        }
+    }
+    
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+
+    private static class RegistrationRequest
+    {
+        private final SocketAddress address;
+        
+        private final IoHandler handler;
+        
+        private Throwable exception; 
+        
+        private boolean done;
+        
+        private RegistrationRequest( SocketAddress address, IoHandler handler )
+        {
+            this.address = address;
+            this.handler = handler;
+        }
+    }
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+        private boolean done;
+        private RuntimeException exception;
+        
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Mon May  9 02:21:50 2005
@@ -0,0 +1,501 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class DatagramConnector extends DatagramSessionManager implements IoConnector
+{
+    private static volatile int nextId = 0;
+
+    private final IoSessionManagerFilterChain filters =
+        new DatagramSessionManagerFilterChain( this );
+
+    private final int id = nextId ++ ;
+
+    private final Selector selector;
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     * 
+     * @throws IOException if failed to open a selector
+     */
+    public DatagramConnector() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
+    {
+        return connect( address, null, handler);
+    }
+
+    public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
+    {
+        return connect( address, null, handler );
+    }
+
+    public IoSession connect( SocketAddress address, SocketAddress localAddress, int timeout, IoHandler handler ) throws IOException
+    {
+        return connect( address, localAddress, handler );
+    }
+
+    public IoSession connect( SocketAddress address, SocketAddress localAddress,
+                              IoHandler handler ) throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( !( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+        
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+        }
+        
+        DatagramChannel ch = DatagramChannel.open();
+        boolean initialized = false;
+        try
+        {
+            ch.socket().setReuseAddress( true );
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+            ch.connect( address );
+            ch.configureBlocking( false );
+            initialized = true;
+        }
+        finally
+        {
+            if( !initialized )
+            {
+                ch.close();
+            }
+        }
+
+        RegistrationRequest request = new RegistrationRequest( ch, handler );
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            ExceptionUtil.throwException( request.exception );
+        }
+
+        return request.session;
+    }
+    
+    private synchronized void startupWorker()
+    {
+        if( worker == null )
+        {
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    void closeSession( DatagramSession session )
+    {
+        synchronized( this )
+        {
+            SelectionKey key = session.getSelectionKey();
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( key );
+            }
+            startupWorker();
+        }
+
+        selector.wakeup();
+    }
+
+    void flushSession( DatagramSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    private void scheduleFlush( DatagramSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramAcceptor-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramConnector.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( DatagramConnector.this,
+                            e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramSession session = ( DatagramSession ) key.attachment();
+
+            if( key.isReadable() )
+            {
+                readSession( session );
+            }
+
+            if( key.isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+    }
+
+    private void readSession( DatagramSession session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+        try
+        {
+            int readBytes = session.getChannel().read( readBuf.buf() );
+            if( readBytes > 0 )
+            {
+                readBuf.flip();
+                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+                newBuf.put( readBuf );
+                newBuf.flip();
+
+                session.increaseReadBytes( readBytes );
+                filters.messageReceived( session, newBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            filters.exceptionCaught( session, e );
+        }
+        finally
+        {
+            readBuf.release();
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSession session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeBufferQueue = session.getWriteBufferQueue();
+
+        ByteBuffer buf;
+        for( ;; )
+        {
+            synchronized( writeBufferQueue )
+            {
+                buf = ( ByteBuffer ) writeBufferQueue.first();
+            }
+
+            if( buf == null )
+                break;
+
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                }
+
+                // FIXME buffer must be released after messageSent is fired.
+                session.getManagerFilterChain().messageSent( session, buf );
+                continue;
+            }
+
+            int writtenBytes = ch.write( buf.buf() );
+
+            SelectionKey key = session.getSelectionKey();
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else if( writtenBytes > 0 )
+            {
+                key.interestOps( key.interestOps()
+                                 & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                }
+
+                session.increaseWrittenBytes( writtenBytes );
+                session.getManagerFilterChain().messageSent( session, buf );
+            }
+        }
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            DatagramSession session = new DatagramSession(
+                    filters, req.channel, req.handler );
+
+            try
+            {
+                req.handler.sessionCreated( session );
+
+                SelectionKey key = req.channel.register( selector,
+                        SelectionKey.OP_READ, session );
+    
+                session.setSelectionKey( key );
+            }
+            catch( Throwable t )
+            {
+                req.exception = t;
+            }
+            finally 
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+                    req.session = session;
+                    req.notify();
+                }
+                
+                if( req.exception != null )
+                {
+                    try
+                    {
+                        req.channel.close();
+                    }
+                    catch (IOException e)
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SelectionKey key;
+            synchronized( cancelQueue )
+            {
+                key = ( SelectionKey ) cancelQueue.pop();
+            }
+
+            if( key == null )
+                break;
+            else
+            {
+                DatagramChannel ch = ( DatagramChannel ) key.channel();
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( this, e );
+                }
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+
+    private static class RegistrationRequest
+    {
+        private final DatagramChannel channel;
+
+        private final IoHandler handler;
+        
+        private boolean done;
+        
+        private DatagramSession session;
+        
+        private Throwable exception;
+
+        private RegistrationRequest( DatagramChannel channel,
+                                     IoHandler handler )
+        {
+            this.channel = channel;
+            this.handler = handler;
+        }
+    }
+}

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java Mon May  9 02:21:50 2005
@@ -0,0 +1,185 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+
+import org.apache.mina.common.BaseIoSession;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.IoSessionFilterChain;
+import org.apache.mina.common.IoSessionManager;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.common.TransportType;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoSession} for datagram transport (UDP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+class DatagramSession extends BaseIoSession implements IoSession
+{
+    private final IoSessionManagerFilterChain managerFilterChain;
+    
+    private final IoSessionFilterChain filterChain;
+
+    private final DatagramChannel ch;
+
+    private final DatagramSessionConfig config;
+
+    private final Queue writeBufferQueue;
+
+    private final IoHandler handler;
+
+    private final SocketAddress localAddress;
+
+    private SocketAddress remoteAddress;
+
+    private SelectionKey key;
+    
+    private boolean disposed;
+
+    /**
+     * Creates a new instance.
+     */
+    DatagramSession( IoSessionManagerFilterChain managerFilterChain,
+                     DatagramChannel ch, IoHandler defaultHandler )
+    {
+        this.managerFilterChain = managerFilterChain;
+        this.filterChain = new IoSessionFilterChain( managerFilterChain );
+        this.ch = ch;
+        this.config = new DatagramSessionConfig( ch );
+        this.writeBufferQueue = new Queue();
+        this.handler = defaultHandler;
+        this.remoteAddress = ch.socket().getRemoteSocketAddress();
+        this.localAddress = ch.socket().getLocalSocketAddress();
+    }
+
+    IoSessionManagerFilterChain getManagerFilterChain()
+    {
+        return managerFilterChain;
+    }
+    
+    public IoFilterChain getFilterChain()
+    {
+        return filterChain;
+    }
+
+    DatagramChannel getChannel()
+    {
+        return ch;
+    }
+
+    SelectionKey getSelectionKey()
+    {
+        return key;
+    }
+
+    void setSelectionKey( SelectionKey key )
+    {
+        this.key = key;
+    }
+
+    public IoHandler getHandler()
+    {
+        return handler;
+    }
+    
+    synchronized void notifyClose()
+    {
+        if( !disposed )
+        {
+            disposed = true;
+            notify();
+        }
+    }
+
+    public synchronized void close( boolean wait )
+    {
+        if( disposed )
+        {
+            return;
+        }
+
+        IoSessionManager manager = managerFilterChain.getManager();
+        if( manager instanceof DatagramConnector )
+        {
+            ( ( DatagramConnector ) manager ).closeSession( this );
+            if( wait )
+            {
+                while( disposed )
+                {
+                    try
+                    {
+                        wait();
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    Queue getWriteBufferQueue()
+    {
+        return writeBufferQueue;
+    }
+
+    public void write( Object message )
+    {
+        filterChain.filterWrite( this, message );
+    }
+
+    public TransportType getTransportType()
+    {
+        return TransportType.DATAGRAM;
+    }
+
+    public boolean isConnected()
+    {
+        return ch.isConnected();
+    }
+
+    public IoSessionConfig getConfig()
+    {
+        return config;
+    }
+
+    public SocketAddress getRemoteAddress()
+    {
+        return remoteAddress;
+    }
+
+    void setRemoteAddress( SocketAddress remoteAddress )
+    {
+        this.remoteAddress = remoteAddress;
+    }
+
+    public SocketAddress getLocalAddress()
+    {
+        return localAddress;
+    }
+}
\ No newline at end of file

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java Mon May  9 02:21:50 2005
@@ -0,0 +1,65 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.net.SocketException;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.common.BaseIoSessionConfig;
+import org.apache.mina.common.IoSession;
+
+/**
+ * A {@link SessionConfig} for datagram transport (UDP/IP).
+ * You can downcast {@link SessionConfig} instance returned by
+ * {@link IoSession#getConfig()} or {@link IoSession#getConfig()}
+ * if you've created datagram session using {@link DatagramAcceptor} or 
+ * {@link DatagramConnector}.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class DatagramSessionConfig extends BaseIoSessionConfig
+{
+    private final DatagramChannel ch;
+
+    DatagramSessionConfig( DatagramChannel ch )
+    {
+        this.ch = ch;
+    }
+
+    public boolean getReuseAddress() throws SocketException
+    {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress( boolean on ) throws SocketException
+    {
+        ch.socket().setReuseAddress( on );
+    }
+
+    public int getTrafficClass() throws SocketException
+    {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass( int tc ) throws SocketException
+    {
+        ch.socket().setTrafficClass( tc );
+    }
+}
\ No newline at end of file

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java Mon May  9 02:21:50 2005
@@ -0,0 +1,45 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoSessionManager;
+
+/**
+ * A base class for {@link DatagramAcceptor} and {@link DatagramConnector}.
+ * Session interacts with this abstract class instead of those two concrete
+ * classes.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+abstract class DatagramSessionManager extends BaseIoSessionManager implements IoSessionManager
+{
+    /**
+     * Requests this processor to flush the write buffer of the specified
+     * session.  This method is invoked by MINA internally.
+     */
+    abstract void flushSession( DatagramSession session );
+
+    /**
+     * Requests this processor to close the specified session.
+     * This method is invoked by MINA internally.
+     */
+    abstract void closeSession( DatagramSession session );
+}
\ No newline at end of file

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java Mon May  9 02:21:50 2005
@@ -0,0 +1,32 @@
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * An {@link IoFilterChain} for datagram transport (UDP/IP).
+ * 
+ * @author The Apache Directory Project
+ */
+class DatagramSessionManagerFilterChain extends IoSessionManagerFilterChain {
+
+    DatagramSessionManagerFilterChain( DatagramSessionManager processor )
+    {
+        super( processor );
+    }
+    
+    protected void doWrite( IoSession session, Object message )
+    {
+        DatagramSession s = ( DatagramSession ) session;
+        Queue writeBufferQueue = s.getWriteBufferQueue();
+        
+        synchronized( writeBufferQueue )
+        {
+            writeBufferQueue.push( message );
+        }
+
+        ( ( DatagramSessionManager ) getManager() ).flushSession( s );
+    }
+}

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Mon May  9 02:21:50 2005
@@ -0,0 +1,471 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ * 
+ * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
+ * @version $Rev$, $Date$
+ */
+public class SocketAcceptor extends BaseIoSessionManager implements IoAcceptor
+{
+    private static volatile int nextId = 0;
+
+    private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+    private final int id = nextId ++ ;
+
+    private final Selector selector;
+
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+    
+    private int backlog = 50;
+
+    private Worker worker;
+
+
+    /**
+     * Creates a new instance.
+     * 
+     * @throws IOException
+     */
+    public SocketAcceptor() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    /**
+     * Binds to the specified <code>address</code> and handles incoming
+     * connections with the specified <code>handler</code>.  Backlog value
+     * is configured to the value of <code>backlog</code> property.
+     *
+     * @throws IOException if failed to bind
+     */
+    public void bind( SocketAddress address, IoHandler handler ) throws IOException
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        if( handler == null )
+        {
+            throw new NullPointerException( "handler" );
+        }
+
+        if( !( address instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+        }
+
+        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+        {
+            throw new IllegalArgumentException( "Unsupported port number: 0" );
+        }
+        
+        RegistrationRequest request = new RegistrationRequest( address, backlog, handler );
+
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+        
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            throw request.exception;
+        }
+    }
+
+
+    private synchronized void startupWorker()
+    {
+        if( worker == null )
+        {
+            worker = new Worker();
+
+            worker.start();
+        }
+    }
+
+
+    public void unbind( SocketAddress address )
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        CancellationRequest request = new CancellationRequest( address );
+        synchronized( this )
+        {
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( request );
+            }
+            startupWorker();
+        }
+        
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+
+            throw request.exception;
+        }
+    }
+    
+    /**
+     * Returns the default backlog value which is used when user binds. 
+     */
+    public int getBacklog()
+    {
+        return backlog;
+    }
+    
+    /**
+     * Sets the default backlog value which is used when user binds. 
+     */
+    public void setBacklog( int defaultBacklog )
+    {
+        if( defaultBacklog <= 0 )
+        {
+            throw new IllegalArgumentException( "defaultBacklog: " + defaultBacklog );
+        }
+        this.backlog = defaultBacklog;
+    }
+
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "SocketAcceptor-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+                    cancelKeys();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketAcceptor.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( SocketAcceptor.this, e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+
+        private void processSessions( Set keys ) throws IOException
+        {
+            Iterator it = keys.iterator();
+            while( it.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) it.next();
+   
+                it.remove();
+   
+                if( !key.isAcceptable() )
+                {
+                    continue;
+                }
+   
+                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+   
+                SocketChannel ch = ssc.accept();
+   
+                if( ch == null )
+                {
+                    continue;
+                }
+   
+                boolean success = false;
+                try
+                {
+                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+                    SocketSession session = new SocketSession( filters, ch, req.handler );
+                    req.handler.sessionCreated( session );
+                    SocketIoProcessor.getInstance().addSession( session );
+                    success = true;
+                }
+                catch( Throwable t )
+                {
+                    exceptionMonitor.exceptionCaught( SocketAcceptor.this, t );
+                }
+                finally
+                {
+                    if( !success )
+                    {
+                        ch.close();
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = null;
+
+            try
+            {
+                ssc = ServerSocketChannel.open();
+                ssc.configureBlocking( false );
+                ssc.socket().bind( req.address, req.backlog );
+                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+                channels.put( req.address, ssc );
+            }
+            catch( IOException e )
+            {
+                req.exception = e;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+
+                    req.notify();
+                }
+
+                if( ssc != null && req.exception != null )
+                {
+                    try
+                    {
+                        ssc.close();
+                    }
+                    catch( IOException e )
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ;; )
+        {
+            CancellationRequest request;
+
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+
+            if( request == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = ( ServerSocketChannel ) channels.remove( request.address );
+            
+            // close the channel
+            try
+            {
+                if( ssc == null )
+                {
+                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ssc.keyFor( selector );
+
+                    key.cancel();
+
+                    selector.wakeup(); // wake up again to trigger thread death
+
+                    ssc.close();
+                }
+            }
+            catch( IOException e )
+            {
+                exceptionMonitor.exceptionCaught( this, e );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+
+                    request.notify();
+                }
+            }
+        }
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+
+    private static class RegistrationRequest
+    {
+        private final SocketAddress address;
+        
+        private final int backlog;
+
+        private final IoHandler handler;
+        
+        private IOException exception; 
+        
+        private boolean done;
+        
+        private RegistrationRequest( SocketAddress address, int backlog, IoHandler handler )
+        {
+            this.address = address;
+            this.backlog = backlog;
+            this.handler = handler;
+        }
+    }
+
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+
+        private boolean done;
+
+        private RuntimeException exception;
+        
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java Mon May  9 02:21:50 2005
@@ -0,0 +1,381 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.BaseIoSessionManager;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class SocketConnector extends BaseIoSessionManager implements IoConnector
+{
+    private static volatile int nextId = 0;
+
+    private final int id = nextId++;
+
+    private final IoSessionManagerFilterChain filters = new SocketSessionManagerFilterChain( this );
+
+    private final Selector selector;
+
+    private final Queue connectQueue = new Queue();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     * 
+     * @throws IOException
+     */
+    public SocketConnector() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    public IoSession connect( SocketAddress address, IoHandler handler ) throws IOException
+    {
+        return connect( address, null, Integer.MAX_VALUE, handler);
+    }
+
+    public IoSession connect( SocketAddress address, SocketAddress localAddress, IoHandler handler ) throws IOException
+    {
+        return connect( address, localAddress, Integer.MAX_VALUE, handler);
+    }
+
+    public IoSession connect( SocketAddress address, int timeout, IoHandler handler ) throws IOException
+    {
+        return connect( address, null, timeout, handler);
+    }
+
+    public IoSession connect( SocketAddress address, SocketAddress localAddress,
+                              int timeout, IoHandler handler ) throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( timeout <= 0 )
+            throw new IllegalArgumentException( "Illegal timeout: " + timeout );
+
+        if( ! ( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+
+        SocketChannel ch = SocketChannel.open();
+        boolean success = false;
+        try
+        {
+            ch.socket().setReuseAddress( true );
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+    
+            ch.configureBlocking( false );
+
+            if( ch.connect( address ) )
+            {
+                SocketSession session = newSession( ch, handler );
+                success = true;
+                return session;
+            }
+            
+            success = true;
+        }
+        finally
+        {
+            if( !success )
+            {
+                ch.close();
+            }
+        }
+        
+        ConnectionRequest request = new ConnectionRequest( ch, timeout, handler );
+        synchronized( this )
+        {
+            synchronized( connectQueue )
+            {
+                connectQueue.push( request );
+            }
+            startupWorker();
+        }
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+
+        if( request.exception != null )
+        {
+            ExceptionUtil.throwException( request.exception );
+        }
+
+        return request.session;
+    }
+    
+    private synchronized void startupWorker()
+    {
+        if( worker == null )
+        {
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    private void registerNew()
+    {
+        if( connectQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            ConnectionRequest req;
+            synchronized( connectQueue )
+            {
+                req = ( ConnectionRequest ) connectQueue.pop();
+            }
+
+            if( req == null )
+                break;
+            
+            SocketChannel ch = req.channel;
+            try
+            {
+                ch.register( selector, SelectionKey.OP_CONNECT, req );
+            }
+            catch( IOException e )
+            {
+                req.exception = e;
+                synchronized( req )
+                {
+                    req.done = true;
+                    req.notify();
+                }
+            }
+        }
+    }
+    
+    private void processSessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isConnectable() )
+                continue;
+
+            SocketChannel ch = ( SocketChannel ) key.channel();
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            try
+            {
+                ch.finishConnect();
+                SocketSession session = newSession( ch, entry.handler );
+                entry.session = session;
+            }
+            catch( Throwable e )
+            {
+                entry.exception = e;
+            }
+            finally
+            {
+                key.cancel();
+                if( entry.session == null )
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch( IOException e )
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+
+                synchronized( entry )
+                {
+                    entry.done = true;
+                    entry.notify();
+                }
+            }
+        }
+
+        keys.clear();
+    }
+
+    private void processTimedOutSessions( Set keys )
+    {
+        long currentTime = System.currentTimeMillis();
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isValid() )
+                continue;
+
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            if( currentTime >= entry.deadline )
+            {
+                entry.exception = new ConnectException();
+                entry.done = true;
+
+                synchronized( entry )
+                {
+                    entry.notify();
+                }
+
+                key.cancel();
+            }
+        }
+    }
+
+    private SocketSession newSession( SocketChannel ch, IoHandler handler ) throws IOException
+    {
+        SocketSession session = new SocketSession( filters, ch, handler );
+        try
+        {
+            handler.sessionCreated( session );
+        }
+        catch( Throwable e )
+        {
+            ExceptionUtil.throwException( e );
+        }
+        SocketIoProcessor.getInstance().addSession( session );
+        return session;
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "SocketConnector-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select( 1000 );
+
+                    registerNew();
+                    
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    processTimedOutSessions( selector.keys() );
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketConnector.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                connectQueue.isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( SocketConnector.this, e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private static class ConnectionRequest
+    {
+        private final SocketChannel channel;
+        
+        private final long deadline;
+
+        private final IoHandler handler;
+        
+        private SocketSession session;
+
+        private boolean done;
+
+        private Throwable exception;
+
+        private ConnectionRequest( SocketChannel channel, int timeout, IoHandler handler )
+        {
+            this.channel = channel;
+            this.deadline = System.currentTimeMillis() + timeout * 1000L;
+            this.handler = handler;
+        }
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+}
\ No newline at end of file

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
URL: http://svn.apache.org/viewcvs/directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?rev=169259&view=auto
==============================================================================
--- directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (added)
+++ directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Mon May  9 02:21:50 2005
@@ -0,0 +1,544 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.Queue;
+
+/**
+ * Performs all I/O operations for sockets which is connected or bound.
+ * This class is used by MINA internally.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+class SocketIoProcessor
+{
+    private static final SocketIoProcessor instance;
+
+    static
+    {
+        SocketIoProcessor tmp;
+
+        try
+        {
+            tmp = new SocketIoProcessor();
+        }
+        catch( IOException e )
+        {
+            InternalError error = new InternalError(
+                                                     "Failed to open selector." );
+            error.initCause( e );
+            throw error;
+        }
+
+        instance = tmp;
+    }
+
+    private final Selector selector;
+
+    private final Queue newSessions = new Queue();
+
+    private final Queue removingSessions = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private final Queue readableSessions = new Queue();
+
+    private Worker worker;
+
+    private long lastIdleCheckTime = System.currentTimeMillis();
+
+    private SocketIoProcessor() throws IOException
+    {
+        selector = Selector.open();
+    }
+
+    static SocketIoProcessor getInstance()
+    {
+        return instance;
+    }
+
+    void addSession( SocketSession session )
+    {
+        synchronized( this )
+        {
+            synchronized( newSessions )
+            {
+                newSessions.push( session );
+            }
+            startupWorker();
+        }
+
+        selector.wakeup();
+    }
+
+    void removeSession( SocketSession session )
+    {
+        scheduleRemove( session );
+        startupWorker();
+        selector.wakeup();
+    }
+
+    private synchronized void startupWorker()
+    {
+        if( worker == null )
+        {
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    void flushSession( SocketSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    void addReadableSession( SocketSession session )
+    {
+        synchronized( readableSessions )
+        {
+            readableSessions.push( session );
+        }
+        selector.wakeup();
+    }
+
+    private void addSessions()
+    {
+        if( newSessions.isEmpty() )
+            return;
+
+        SocketSession session;
+
+        for( ;; )
+        {
+            synchronized( newSessions )
+            {
+                session = ( SocketSession ) newSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            boolean registered;
+
+            try
+            {
+                ch.configureBlocking( false );
+                session.setSelectionKey( ch.register( selector,
+                                                      SelectionKey.OP_READ,
+                                                      session ) );
+                registered = true;
+            }
+            catch( IOException e )
+            {
+                registered = false;
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+
+            if( registered )
+            {
+                session.getManagerFilterChain().sessionOpened( session );
+            }
+        }
+    }
+
+    private void removeSessions()
+    {
+        if( removingSessions.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            SocketSession session;
+
+            synchronized( removingSessions )
+            {
+                session = ( SocketSession ) removingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            SocketChannel ch = session.getChannel();
+            SelectionKey key = session.getSelectionKey();
+            if( !key.isValid() ) // skip if channel is already closed
+            {
+                continue;
+            }
+
+            try
+            {
+                key.cancel();
+                ch.close();
+            }
+            catch( IOException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+            finally
+            {
+                releaseWriteBuffers( session );
+
+                session.getManagerFilterChain().sessionClosed( session );
+                session.notifyClose();
+            }
+        }
+    }
+
+    private void processSessions( Set selectedKeys )
+    {
+        Iterator it = selectedKeys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            SocketSession session = ( SocketSession ) key.attachment();
+
+            if( key.isReadable() )
+            {
+                read( session );
+            }
+
+            if( key.isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+
+        selectedKeys.clear();
+    }
+
+    private void read( SocketSession session )
+    {
+        ByteBuffer buf = ByteBuffer.allocate(
+                (( SocketSessionConfig ) session.getConfig()).getSessionReceiveBufferSize() ); 
+        SocketChannel ch = session.getChannel();
+
+        try
+        {
+            int readBytes = 0;
+            int ret;
+
+            buf.clear();
+
+            try
+            {
+                while( ( ret = ch.read( buf.buf() ) ) > 0 )
+                {
+                    readBytes += ret;
+                }
+            }
+            finally
+            {
+                buf.flip();
+            }
+
+            session.increaseReadBytes( readBytes );
+            session.setIdle( IdleStatus.BOTH_IDLE, false );
+            session.setIdle( IdleStatus.READER_IDLE, false );
+
+            if( readBytes > 0 )
+            {
+                ByteBuffer newBuf = ByteBuffer.allocate( readBytes );
+                newBuf.put( buf );
+                newBuf.flip();
+                session.getManagerFilterChain().messageReceived( session, newBuf );
+            }
+            if( ret < 0 )
+            {
+                scheduleRemove( session );
+            }
+        }
+        catch( Throwable e )
+        {
+            if( e instanceof IOException )
+                scheduleRemove( session );
+            session.getManagerFilterChain().exceptionCaught( session, e );
+        }
+        finally
+        {
+            buf.release();
+        }
+    }
+
+    private void scheduleRemove( SocketSession session )
+    {
+        synchronized( removingSessions )
+        {
+            removingSessions.push( session );
+        }
+    }
+
+    private void scheduleFlush( SocketSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private void notifyIdleSessions()
+    {
+        Set keys = selector.keys();
+        Iterator it;
+        SocketSession session;
+
+        // process idle sessions
+        long currentTime = System.currentTimeMillis();
+
+        if( ( keys != null ) && ( ( currentTime - lastIdleCheckTime ) >= 1000 ) )
+        {
+            lastIdleCheckTime = currentTime;
+            it = keys.iterator();
+
+            while( it.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) it.next();
+                session = ( SocketSession ) key.attachment();
+
+                notifyIdleSession( session, currentTime );
+            }
+        }
+    }
+
+    private void notifyIdleSession( SocketSession session, long currentTime )
+    {
+        IoSessionConfig config = session.getConfig();
+
+        notifyIdleSession0( session, currentTime, config
+                .getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+                            IdleStatus.BOTH_IDLE, session.getLastIoTime() );
+        notifyIdleSession0( session, currentTime, config
+                .getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+                            IdleStatus.READER_IDLE, session.getLastReadTime() );
+        notifyIdleSession0( session, currentTime, config
+                .getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+                            IdleStatus.WRITER_IDLE, session.getLastWriteTime() );
+
+        notifyWriteTimeoutSession( session, currentTime, config
+                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+    }
+
+    private void notifyIdleSession0( SocketSession session, long currentTime,
+                                    long idleTime, IdleStatus status,
+                                    long lastIoTime )
+    {
+        if( idleTime > 0 && !session.isIdle( status ) && lastIoTime != 0
+            && ( currentTime - lastIoTime ) >= idleTime )
+        {
+            session.setIdle( status, true );
+            session.getManagerFilterChain().sessionIdle( session, status );
+        }
+    }
+
+    private void notifyWriteTimeoutSession( SocketSession session,
+                                           long currentTime,
+                                           long writeTimeout, long lastIoTime )
+    {
+        if( writeTimeout > 0
+            && ( currentTime - lastIoTime ) >= writeTimeout
+            && ( session.getSelectionKey().interestOps() & SelectionKey.OP_WRITE ) != 0 )
+        {
+            session
+                    .getManagerFilterChain()
+                    .exceptionCaught( session, new WriteTimeoutException() );
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            SocketSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( SocketSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            if( !session.isConnected() )
+            {
+                releaseWriteBuffers( session );
+                continue;
+            }
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                scheduleRemove( session );
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+    
+    private void releaseWriteBuffers( SocketSession session )
+    {
+        Queue writeBufferQueue = session.getWriteBufferQueue();
+        ByteBuffer buf;
+        
+        while( ( buf = (ByteBuffer) writeBufferQueue.pop() ) != null )
+        {
+            try
+            {
+                buf.release();
+            }
+            catch( IllegalStateException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( SocketSession session ) throws IOException
+    {
+        SocketChannel ch = session.getChannel();
+
+        Queue writeBufferQueue = session.getWriteBufferQueue();
+
+        ByteBuffer buf;
+        for( ;; )
+        {
+            synchronized( writeBufferQueue )
+            {
+                buf = ( ByteBuffer ) writeBufferQueue.first();
+            }
+
+            if( buf == null )
+                break;
+
+            if( buf.remaining() == 0 )
+            {
+                synchronized( writeBufferQueue )
+                {
+                    writeBufferQueue.pop();
+                }
+                
+                // FIXME buffer is not released
+                session.getManagerFilterChain().messageSent( session, buf );
+                continue;
+            }
+
+            int writtenBytes = 0;
+            try
+            {
+                writtenBytes = ch.write( buf.buf() );
+            }
+            finally
+            {
+                if( writtenBytes > 0 )
+                {
+                    session.increaseWrittenBytes( writtenBytes );
+                    session.setIdle( IdleStatus.BOTH_IDLE, false );
+                    session.setIdle( IdleStatus.WRITER_IDLE, false );
+                }
+
+                SelectionKey key = session.getSelectionKey();
+                if( buf.hasRemaining() )
+                {
+                    // Kernel buffer is full
+                    key
+                            .interestOps( key.interestOps()
+                                          | SelectionKey.OP_WRITE );
+                    break;
+                }
+                else
+                {
+                    key.interestOps( key.interestOps()
+                                     & ( ~SelectionKey.OP_WRITE ) );
+                }
+            }
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "SocketIoProcessor" );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select( 1000 );
+                    addSessions();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    removeSessions();
+                    notifyIdleSessions();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( SocketIoProcessor.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                newSessions.isEmpty() )
+                            {
+                                worker = null;
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    e.printStackTrace();
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: directory/network/branches/api_integration/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision