You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/10/19 10:56:38 UTC

svn commit: r326489 [1/2] - in /directory/network/trunk/src/java/org/apache/mina/transport: socket/nio/ socket/nio/support/ vmpipe/ vmpipe/support/

Author: trustin
Date: Wed Oct 19 01:56:19 2005
New Revision: 326489

URL: http://svn.apache.org/viewcvs?rev=326489&view=rev
Log:
Moved the classes used internally to <original package name>.support package.

Added:
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
      - copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSession.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketSessionManagerFilterChain.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipe.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java
      - copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeFilter.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeIdleStatusChecker.java
      - copied, changed from r326410, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeIdleStatusChecker.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSession.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/support/VmPipeSessionManagerFilterChain.java
      - copied, changed from r326451, directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSessionManagerFilterChain.java
Removed:
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionConfig.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionConfig.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketSessionManagerFilterChain.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeFilter.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeIdleStatusChecker.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSession.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeSessionManagerFilterChain.java
Modified:
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeAcceptor.java
    directory/network/trunk/src/java/org/apache/mina/transport/vmpipe/VmPipeConnector.java

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramAcceptor.java Wed Oct 19 01:56:19 2005
@@ -19,27 +19,15 @@
 package org.apache.mina.transport.socket.nio;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
 
-import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionManagerConfig;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoAcceptorConfig;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.util.ExceptionUtil;
-import org.apache.mina.util.Queue;
+import org.apache.mina.transport.socket.nio.support.DatagramAcceptorImpl;
 
 /**
  * {@link IoAcceptor} for datagram transport (UDP/IP).
@@ -47,569 +35,47 @@
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
  */
-public class DatagramAcceptor extends DatagramSessionManager implements IoAcceptor
+public class DatagramAcceptor implements IoAcceptor
 {
-    private static volatile int nextId = 0;
-
-    private final IoSessionManagerFilterChain filters =
-        new DatagramSessionManagerFilterChain( this );
-
-    private final int id = nextId ++ ;
-
-    private Selector selector;
-
-    private final Map channels = new HashMap();
-
-    private final Queue registerQueue = new Queue();
-
-    private final Queue cancelQueue = new Queue();
-
-    private final Queue flushingSessions = new Queue();
-
-    private Worker worker;
-
-    /**
-     * Creates a new instance.
-     */
-    public DatagramAcceptor()
-    {
-    }
-
-    public void bind( SocketAddress address, IoHandler handler )
-            throws IOException
+    private final IoAcceptor stub = new DatagramAcceptorImpl();
+    
+    public void bind( SocketAddress address, IoHandler handler ) throws IOException
     {
-        if( address == null )
-            throw new NullPointerException( "address" );
-        if( handler == null )
-            throw new NullPointerException( "handler" );
-
-        if( !( address instanceof InetSocketAddress ) )
-            throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
-        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
-            throw new IllegalArgumentException( "Unsupported port number: 0" );
-        
-        RegistrationRequest request = new RegistrationRequest( address, handler );
-        synchronized( this )
-        {
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
-            startupWorker();
-        }
-        selector.wakeup();
-        
-        synchronized( request )
-        {
-            while( !request.done )
-            {
-                try
-                {
-                    request.wait();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-        }
-        
-        if( request.exception != null )
-        {
-            ExceptionUtil.throwException( request.exception );
-        }
+        stub.bind( address, handler );
     }
 
     public void unbind( SocketAddress address )
     {
-        unbind( address, true );
+        stub.unbind( address );
     }
 
     public void unbind( SocketAddress address, boolean disconnectClients )
     {
-        // FIXME: Make this work properly when disconnectClients is true.
-        if( address == null )
-            throw new NullPointerException( "address" );
-
-        CancellationRequest request = new CancellationRequest( address );
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply throw
-                // IllegalArgumentException here because we can simply
-                // conclude that nothing is bound to the selector.
-                throw new IllegalArgumentException( "Address not bound: " + address );
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( request );
-            }
-        }
-        selector.wakeup();
-        
-        synchronized( request )
-        {
-            while( !request.done )
-            {
-                try
-                {
-                    request.wait();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-        }
-        
-        if( request.exception != null )
-        {
-            request.exception.fillInStackTrace();
-            throw request.exception;
-        }
-    }
-    
-    public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
-    {
-        if( remoteAddress == null )
-        {
-            throw new NullPointerException( "remoteAddress" );
-        }
-        if( localAddress == null )
-        {
-            throw new NullPointerException( "localAddress" );
-        }
-        
-        Selector selector = this.selector;
-        DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
-        if( selector == null || ch == null )
-        {
-            throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
-        }
-            
-        SelectionKey key = ch.keyFor( selector );
-        if( key == null )
-        {
-            throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
-        }
-
-        RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-        DatagramSession s = new DatagramSession( filters, ch, req.handler );
-        s.setRemoteAddress( remoteAddress );
-        s.setSelectionKey( key );
-        
-        try
-        {
-            req.handler.sessionCreated( s );
-        }
-        catch( Throwable t )
-        {
-            exceptionMonitor.exceptionCaught( this, t );
-        }
-        
-        return s;
-    }
-
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
-        }
-    }
-
-    void flushSession( DatagramSession session )
-    {
-        scheduleFlush( session );
-        selector.wakeup();
-    }
-
-    void closeSession( DatagramSession session )
-    {
-    }
-
-    private void scheduleFlush( DatagramSession session )
-    {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
-    }
-
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( "DatagramAcceptor-" + id );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select();
-
-                    registerNew();
-
-                    if( nKeys > 0 )
-                    {
-                        processReadySessions( selector.selectedKeys() );
-                    }
-
-                    flushSessions();
-                    cancelKeys();
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( DatagramAcceptor.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                registerQueue.isEmpty() &&
-                                cancelQueue.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    exceptionMonitor.exceptionCaught( DatagramAcceptor.this, e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( IOException e )
-                {
-                    exceptionMonitor.exceptionCaught( DatagramAcceptor.this,
-                            e );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-    }
-
-    private void processReadySessions( Set keys )
-    {
-        Iterator it = keys.iterator();
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-            it.remove();
-
-            DatagramChannel ch = ( DatagramChannel ) key.channel();
-
-            RegistrationRequest req = ( RegistrationRequest ) key.attachment();
-            DatagramSession session = new DatagramSession(
-                    filters, ch, req.handler );
-            session.setSelectionKey( key );
-            
-            try
-            {
-                req.handler.sessionCreated( session );
-
-                if( key.isReadable() )
-                {
-                    readSession( session );
-                }
-
-                if( key.isWritable() )
-                {
-                    scheduleFlush( session );
-                }
-            }
-            catch( Throwable t )
-            {
-                exceptionMonitor.exceptionCaught( this, t );
-            }
-        }
-    }
-
-    private void readSession( DatagramSession session )
-    {
-
-        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
-        try
-        {
-            SocketAddress remoteAddress = session.getChannel().receive(
-                    readBuf.buf() );
-            if( remoteAddress != null )
-            {
-                readBuf.flip();
-                session.setRemoteAddress( remoteAddress );
-
-                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
-                newBuf.put( readBuf );
-                newBuf.flip();
-
-                session.increaseReadBytes( newBuf.remaining() );
-                filters.messageReceived( session, newBuf );
-            }
-        }
-        catch( IOException e )
-        {
-            filters.exceptionCaught( session, e );
-        }
-        finally
-        {
-            readBuf.release();
-        }
-    }
-
-    private void flushSessions()
-    {
-        if( flushingSessions.size() == 0 )
-            return;
-
-        for( ;; )
-        {
-            DatagramSession session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( DatagramSession ) flushingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            try
-            {
-                flush( session );
-            }
-            catch( IOException e )
-            {
-                session.getManagerFilterChain().exceptionCaught( session, e );
-            }
-        }
-    }
-
-    private void flush( DatagramSession session ) throws IOException
-    {
-        DatagramChannel ch = session.getChannel();
-
-        Queue writeRequestQueue = session.getWriteRequestQueue();
-
-        WriteRequest req;
-        for( ;; )
-        {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
-
-            if( req == null )
-                break;
-
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
-            {
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                session.getManagerFilterChain().messageSent( session, buf );
-                continue;
-            }
-
-            int pos = buf.position();
-            int writtenBytes = ch
-                    .send( buf.buf(), session.getRemoteAddress() );
-
-            SelectionKey key = session.getSelectionKey();
-            if( writtenBytes == 0 )
-            {
-                // Kernel buffer is full
-                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
-            }
-            else if( writtenBytes > 0 )
-            {
-                key.interestOps( key.interestOps()
-                                 & ( ~SelectionKey.OP_WRITE ) );
-
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                session.increaseWrittenBytes( writtenBytes );
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
-            }
-        }
+        stub.unbind( address, disconnectClients );
     }
 
-    private void registerNew()
+    public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
     {
-        if( registerQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
-
-            if( req == null )
-                break;
-
-            DatagramChannel ch = null;
-            try
-            {
-                ch = DatagramChannel.open();
-                ch.configureBlocking( false );
-                ch.socket().bind( req.address );
-                ch.register( selector, SelectionKey.OP_READ, req );
-                channels.put( req.address, ch );
-            }
-            catch( Throwable t )
-            {
-                req.exception = t;
-            }
-            finally
-            {
-                synchronized( req )
-                {
-                    req.done = true;
-                    req.notify();
-                }
-
-                if( ch != null && req.exception != null )
-                {
-                    try
-                    {
-                        ch.close();
-                    }
-                    catch( Throwable e )
-                    {
-                        exceptionMonitor.exceptionCaught( this, e );
-                    }
-                }
-            }
-        }
+        return stub.newSession( remoteAddress, localAddress );
     }
 
-    private void cancelKeys()
-    {
-        if( cancelQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            CancellationRequest request;
-            synchronized( cancelQueue )
-            {
-                request = ( CancellationRequest ) cancelQueue.pop();
-            }
-            
-            if( request == null )
-            {
-                break;
-            }
-
-            DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
-            // close the channel
-            try
-            {
-                if( ch == null )
-                {
-                    request.exception = new IllegalArgumentException(
-                            "Address not bound: " + request.address );
-                }
-                else
-                {
-                    SelectionKey key = ch.keyFor( selector );
-                    key.cancel();
-                    selector.wakeup(); // wake up again to trigger thread death
-                    ch.close();
-                }
-            }
-            catch( Throwable t )
-            {
-                exceptionMonitor.exceptionCaught( this, t );
-            }
-            finally
-            {
-                synchronized( request )
-                {
-                    request.done = true;
-                    request.notify();
-                }
-            }
-        }
-    }
-    
-    /**
-     * This method just returns dummy configuration instance because datagram
-     * transport type (UDP/IP) doesn't provide any configuration.
-     */
     public IoSessionManagerConfig getConfig()
     {
-        return new BaseIoAcceptorConfig();
+        return stub.getConfig();
     }
 
     public IoFilterChain getFilterChain()
     {
-        return filters;
+        return stub.getFilterChain();
     }
 
-    private static class RegistrationRequest
+    public ExceptionMonitor getExceptionMonitor()
     {
-        private final SocketAddress address;
-        
-        private final IoHandler handler;
-        
-        private Throwable exception; 
-        
-        private boolean done;
-        
-        private RegistrationRequest( SocketAddress address, IoHandler handler )
-        {
-            this.address = address;
-            this.handler = handler;
-        }
+        return stub.getExceptionMonitor();
     }
 
-    private static class CancellationRequest
+    public void setExceptionMonitor( ExceptionMonitor monitor )
     {
-        private final SocketAddress address;
-        private boolean done;
-        private RuntimeException exception;
-        
-        private CancellationRequest( SocketAddress address )
-        {
-            this.address = address;
-        }
+        stub.setExceptionMonitor( monitor );
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramConnector.java Wed Oct 19 01:56:19 2005
@@ -19,24 +19,15 @@
 package org.apache.mina.transport.socket.nio;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
-import java.nio.channels.DatagramChannel;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.util.Iterator;
-import java.util.Set;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoSessionManagerConfig;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.common.support.BaseIoConnectorConfig;
-import org.apache.mina.common.support.IoSessionManagerFilterChain;
-import org.apache.mina.util.Queue;
+import org.apache.mina.transport.socket.nio.support.DatagramConnectorImpl;
 
 /**
  * {@link IoConnector} for datagram transport (UDP/IP).
@@ -44,24 +35,9 @@
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
  */
-public class DatagramConnector extends DatagramSessionManager implements IoConnector
+public class DatagramConnector implements IoConnector
 {
-    private static volatile int nextId = 0;
-
-    private final IoSessionManagerFilterChain filters =
-        new DatagramSessionManagerFilterChain( this );
-
-    private final int id = nextId ++ ;
-
-    private Selector selector;
-
-    private final Queue registerQueue = new Queue();
-
-    private final Queue cancelQueue = new Queue();
-
-    private final Queue flushingSessions = new Queue();
-
-    private Worker worker;
+    private final IoConnector stub = new DatagramConnectorImpl();
 
     /**
      * Creates a new instance.
@@ -72,423 +48,18 @@
 
     public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
     {
-        return connect( address, null, handler);
+        return stub.connect( address, handler );
     }
 
     public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
                               IoHandler handler ) throws IOException
     {
-        if( address == null )
-            throw new NullPointerException( "address" );
-        if( handler == null )
-            throw new NullPointerException( "handler" );
-
-        if( !( address instanceof InetSocketAddress ) )
-            throw new IllegalArgumentException( "Unexpected address type: "
-                                                + address.getClass() );
-        
-        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
-        {
-            throw new IllegalArgumentException( "Unexpected local address type: "
-                                                + localAddress.getClass() );
-        }
-        
-        DatagramChannel ch = DatagramChannel.open();
-        boolean initialized = false;
-        try
-        {
-            ch.socket().setReuseAddress( true );
-            if( localAddress != null )
-            {
-                ch.socket().bind( localAddress );
-            }
-            ch.connect( address );
-            ch.configureBlocking( false );
-            initialized = true;
-        }
-        finally
-        {
-            if( !initialized )
-            {
-                ch.close();
-            }
-        }
-
-        RegistrationRequest request = new RegistrationRequest( ch, handler );
-        synchronized( this )
-        {
-            synchronized( registerQueue )
-            {
-                registerQueue.push( request );
-            }
-            startupWorker();
-        }
-
-        selector.wakeup();
-        return request;
-    }
-    
-    private synchronized void startupWorker() throws IOException
-    {
-        if( worker == null )
-        {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
-        }
-    }
-
-    void closeSession( DatagramSession session )
-    {
-        synchronized( this )
-        {
-            try
-            {
-                startupWorker();
-            }
-            catch( IOException e )
-            {
-                // IOException is thrown only when Worker thread is not
-                // running and failed to open a selector.  We simply return
-                // silently here because it we can simply conclude that
-                // this session is not managed by this connector or
-                // already closed.
-                return;
-            }
-
-            synchronized( cancelQueue )
-            {
-                cancelQueue.push( session );
-            }
-        }
-
-        selector.wakeup();
-    }
-
-    void flushSession( DatagramSession session )
-    {
-        scheduleFlush( session );
-        selector.wakeup();
-    }
-
-    private void scheduleFlush( DatagramSession session )
-    {
-        synchronized( flushingSessions )
-        {
-            flushingSessions.push( session );
-        }
-    }
-
-    private class Worker extends Thread
-    {
-        public Worker()
-        {
-            super( "DatagramAcceptor-" + id );
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                try
-                {
-                    int nKeys = selector.select();
-
-                    registerNew();
-
-                    if( nKeys > 0 )
-                    {
-                        processReadySessions( selector.selectedKeys() );
-                    }
-
-                    flushSessions();
-                    cancelKeys();
-
-                    if( selector.keys().isEmpty() )
-                    {
-                        synchronized( DatagramConnector.this )
-                        {
-                            if( selector.keys().isEmpty() &&
-                                registerQueue.isEmpty() &&
-                                cancelQueue.isEmpty() )
-                            {
-                                worker = null;
-                                try
-                                {
-                                    selector.close();
-                                }
-                                catch( IOException e )
-                                {
-                                    exceptionMonitor.exceptionCaught( DatagramConnector.this, e );
-                                }
-                                finally
-                                {
-                                    selector = null;
-                                }
-                                break;
-                            }
-                        }
-                    }
-                }
-                catch( IOException e )
-                {
-                    exceptionMonitor.exceptionCaught( DatagramConnector.this,
-                            e );
-
-                    try
-                    {
-                        Thread.sleep( 1000 );
-                    }
-                    catch( InterruptedException e1 )
-                    {
-                    }
-                }
-            }
-        }
-    }
-
-    private void processReadySessions( Set keys )
-    {
-        Iterator it = keys.iterator();
-        while( it.hasNext() )
-        {
-            SelectionKey key = ( SelectionKey ) it.next();
-            it.remove();
-
-            DatagramSession session = ( DatagramSession ) key.attachment();
-
-            if( key.isReadable() )
-            {
-                readSession( session );
-            }
-
-            if( key.isWritable() )
-            {
-                scheduleFlush( session );
-            }
-        }
-    }
-
-    private void readSession( DatagramSession session )
-    {
-
-        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
-        try
-        {
-            int readBytes = session.getChannel().read( readBuf.buf() );
-            if( readBytes > 0 )
-            {
-                readBuf.flip();
-                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
-                newBuf.put( readBuf );
-                newBuf.flip();
-
-                session.increaseReadBytes( readBytes );
-                filters.messageReceived( session, newBuf );
-            }
-        }
-        catch( IOException e )
-        {
-            filters.exceptionCaught( session, e );
-        }
-        finally
-        {
-            readBuf.release();
-        }
-    }
-
-    private void flushSessions()
-    {
-        if( flushingSessions.size() == 0 )
-            return;
-
-        for( ;; )
-        {
-            DatagramSession session;
-
-            synchronized( flushingSessions )
-            {
-                session = ( DatagramSession ) flushingSessions.pop();
-            }
-
-            if( session == null )
-                break;
-
-            try
-            {
-                flush( session );
-            }
-            catch( IOException e )
-            {
-                session.getManagerFilterChain().exceptionCaught( session, e );
-            }
-        }
-    }
-
-    private void flush( DatagramSession session ) throws IOException
-    {
-        DatagramChannel ch = session.getChannel();
-
-        Queue writeRequestQueue = session.getWriteRequestQueue();
-
-        WriteRequest req;
-        for( ;; )
-        {
-            synchronized( writeRequestQueue )
-            {
-                req = ( WriteRequest ) writeRequestQueue.first();
-            }
-
-            if( req == null )
-                break;
-
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
-            {
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                session.getManagerFilterChain().messageSent( session, buf );
-                continue;
-            }
-
-            int pos = buf.position();
-            int writtenBytes = ch.write( buf.buf() );
-
-            SelectionKey key = session.getSelectionKey();
-            if( writtenBytes == 0 )
-            {
-                // Kernel buffer is full
-                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
-            }
-            else if( writtenBytes > 0 )
-            {
-                key.interestOps( key.interestOps()
-                                 & ( ~SelectionKey.OP_WRITE ) );
-
-                // pop and fire event
-                synchronized( writeRequestQueue )
-                {
-                    writeRequestQueue.pop();
-                }
-
-                session.increaseWrittenBytes( writtenBytes );
-                req.getFuture().setWritten( true );
-                session.increaseWrittenWriteRequests();
-                session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
-            }
-        }
-    }
-
-    private void registerNew()
-    {
-        if( registerQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            RegistrationRequest req;
-            synchronized( registerQueue )
-            {
-                req = ( RegistrationRequest ) registerQueue.pop();
-            }
-
-            if( req == null )
-                break;
-
-            DatagramSession session = new DatagramSession(
-                    filters, req.channel, req.handler );
-
-            boolean success = false;
-            try
-            {
-                req.handler.sessionCreated( session );
-
-                SelectionKey key = req.channel.register( selector,
-                        SelectionKey.OP_READ, session );
-    
-                session.setSelectionKey( key );
-
-                req.setSession( session );
-                success = true;
-            }
-            catch( Throwable t )
-            {
-                req.setException( t );
-            }
-            finally 
-            {
-                if( !success )
-                {
-                    try
-                    {
-                        req.channel.close();
-                    }
-                    catch (IOException e)
-                    {
-                        exceptionMonitor.exceptionCaught( this, e );
-                    }
-                }
-            }
-        }
-    }
-
-    private void cancelKeys()
-    {
-        if( cancelQueue.isEmpty() )
-            return;
-
-        for( ;; )
-        {
-            DatagramSession session;
-            synchronized( cancelQueue )
-            {
-                session = ( DatagramSession ) cancelQueue.pop();
-            }
-
-            if( session == null )
-                break;
-            else
-            {
-                SelectionKey key = session.getSelectionKey();
-                DatagramChannel ch = ( DatagramChannel ) key.channel();
-                try
-                {
-                    ch.close();
-                }
-                catch( IOException e )
-                {
-                    exceptionMonitor.exceptionCaught( this, e );
-                }
-                session.notifyClose();
-                key.cancel();
-                selector.wakeup(); // wake up again to trigger thread death
-            }
-        }
+        return stub.connect( address, localAddress, handler );
     }
 
     public IoFilterChain getFilterChain()
     {
-        return filters;
-    }
-
-    private static class RegistrationRequest extends ConnectFuture
-    {
-        private final DatagramChannel channel;
-
-        private final IoHandler handler;
-
-        private RegistrationRequest( DatagramChannel channel,
-                                     IoHandler handler )
-        {
-            this.channel = channel;
-            this.handler = handler;
-        }
+        return stub.getFilterChain();
     }
 
     /**
@@ -497,6 +68,16 @@
      */
     public IoSessionManagerConfig getConfig()
     {
-        return new BaseIoConnectorConfig();
+        return stub.getConfig();
+    }
+
+    public ExceptionMonitor getExceptionMonitor()
+    {
+        return stub.getExceptionMonitor();
+    }
+
+    public void setExceptionMonitor( ExceptionMonitor monitor )
+    {
+        stub.setExceptionMonitor( monitor );
     }
 }

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Wed Oct 19 01:56:19 2005
@@ -36,6 +36,9 @@
 import org.apache.mina.common.IoSessionManagerConfig;
 import org.apache.mina.common.support.BaseIoAcceptor;
 import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSession;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
 import org.apache.mina.util.Queue;
 
 /**

Modified: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=326489&r1=326488&r2=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketConnector.java Wed Oct 19 01:56:19 2005
@@ -37,6 +37,9 @@
 import org.apache.mina.common.support.BaseIoConnectorConfig;
 import org.apache.mina.common.support.BaseIoSessionManager;
 import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.transport.socket.nio.support.SocketIoProcessor;
+import org.apache.mina.transport.socket.nio.support.SocketSession;
+import org.apache.mina.transport.socket.nio.support.SocketSessionManagerFilterChain;
 import org.apache.mina.util.ExceptionUtil;
 import org.apache.mina.util.Queue;
 

Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramAcceptorImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,616 @@
+/*
+ *   @(#) $Id: DatagramAcceptor.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionManagerConfig;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.common.support.BaseIoAcceptorConfig;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.util.ExceptionUtil;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoAcceptor} for datagram transport (UDP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (수, 19 10 2005) $
+ */
+public class DatagramAcceptorImpl extends BaseIoAcceptor implements IoAcceptor, DatagramSessionManager
+{
+    private static volatile int nextId = 0;
+
+    private final IoSessionManagerFilterChain filters =
+        new DatagramSessionManagerFilterChain( this );
+
+    private final int id = nextId ++ ;
+
+    private Selector selector;
+
+    private final Map channels = new HashMap();
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     */
+    public DatagramAcceptorImpl()
+    {
+    }
+
+    public void bind( SocketAddress address, IoHandler handler )
+            throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( !( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+            throw new IllegalArgumentException( "Unsupported port number: 0" );
+        
+        RegistrationRequest request = new RegistrationRequest( address, handler );
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            ExceptionUtil.throwException( request.exception );
+        }
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        unbind( address, true );
+    }
+
+    public void unbind( SocketAddress address, boolean disconnectClients )
+    {
+        // FIXME: Make this work properly when disconnectClients is true.
+        if( address == null )
+            throw new NullPointerException( "address" );
+
+        CancellationRequest request = new CancellationRequest( address );
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                // IOException is thrown only when Worker thread is not
+                // running and failed to open a selector.  We simply throw
+                // IllegalArgumentException here because we can simply
+                // conclude that nothing is bound to the selector.
+                throw new IllegalArgumentException( "Address not bound: " + address );
+            }
+
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( request );
+            }
+        }
+        selector.wakeup();
+        
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+        }
+        
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+            throw request.exception;
+        }
+    }
+    
+    public IoSession newSession( SocketAddress remoteAddress, SocketAddress localAddress )
+    {
+        if( remoteAddress == null )
+        {
+            throw new NullPointerException( "remoteAddress" );
+        }
+        if( localAddress == null )
+        {
+            throw new NullPointerException( "localAddress" );
+        }
+        
+        Selector selector = this.selector;
+        DatagramChannel ch = ( DatagramChannel ) channels.get( localAddress );
+        if( selector == null || ch == null )
+        {
+            throw new IllegalArgumentException( "Unknown localAddress: " + localAddress );
+        }
+            
+        SelectionKey key = ch.keyFor( selector );
+        if( key == null )
+        {
+            throw new IllegalArgumentException( "Unknown lodalAddress: " + localAddress );
+        }
+
+        RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+        DatagramSession s = new DatagramSession( filters, ch, req.handler );
+        s.setRemoteAddress( remoteAddress );
+        s.setSelectionKey( key );
+        
+        try
+        {
+            req.handler.sessionCreated( s );
+        }
+        catch( Throwable t )
+        {
+            exceptionMonitor.exceptionCaught( this, t );
+        }
+        
+        return s;
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    public void flushSession( DatagramSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    public void closeSession( DatagramSession session )
+    {
+    }
+
+    private void scheduleFlush( DatagramSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramAcceptor-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramAcceptorImpl.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    exceptionMonitor.exceptionCaught( DatagramAcceptorImpl.this, e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( DatagramAcceptorImpl.this,
+                            e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramChannel ch = ( DatagramChannel ) key.channel();
+
+            RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+            DatagramSession session = new DatagramSession(
+                    filters, ch, req.handler );
+            session.setSelectionKey( key );
+            
+            try
+            {
+                req.handler.sessionCreated( session );
+
+                if( key.isReadable() )
+                {
+                    readSession( session );
+                }
+
+                if( key.isWritable() )
+                {
+                    scheduleFlush( session );
+                }
+            }
+            catch( Throwable t )
+            {
+                exceptionMonitor.exceptionCaught( this, t );
+            }
+        }
+    }
+
+    private void readSession( DatagramSession session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+        try
+        {
+            SocketAddress remoteAddress = session.getChannel().receive(
+                    readBuf.buf() );
+            if( remoteAddress != null )
+            {
+                readBuf.flip();
+                session.setRemoteAddress( remoteAddress );
+
+                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+                newBuf.put( readBuf );
+                newBuf.flip();
+
+                session.increaseReadBytes( newBuf.remaining() );
+                filters.messageReceived( session, newBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            filters.exceptionCaught( session, e );
+        }
+        finally
+        {
+            readBuf.release();
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSession session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for( ;; )
+        {
+            synchronized( writeRequestQueue )
+            {
+                req = ( WriteRequest ) writeRequestQueue.first();
+            }
+
+            if( req == null )
+                break;
+
+            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                session.getManagerFilterChain().messageSent( session, buf );
+                continue;
+            }
+
+            int pos = buf.position();
+            int writtenBytes = ch
+                    .send( buf.buf(), session.getRemoteAddress() );
+
+            SelectionKey key = session.getSelectionKey();
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else if( writtenBytes > 0 )
+            {
+                key.interestOps( key.interestOps()
+                                 & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                session.increaseWrittenBytes( writtenBytes );
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
+            }
+        }
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            DatagramChannel ch = null;
+            try
+            {
+                ch = DatagramChannel.open();
+                ch.configureBlocking( false );
+                ch.socket().bind( req.address );
+                ch.register( selector, SelectionKey.OP_READ, req );
+                channels.put( req.address, ch );
+            }
+            catch( Throwable t )
+            {
+                req.exception = t;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+                    req.notify();
+                }
+
+                if( ch != null && req.exception != null )
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch( Throwable e )
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            CancellationRequest request;
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+            
+            if( request == null )
+            {
+                break;
+            }
+
+            DatagramChannel ch = ( DatagramChannel ) channels.remove( request.address );
+            // close the channel
+            try
+            {
+                if( ch == null )
+                {
+                    request.exception = new IllegalArgumentException(
+                            "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ch.keyFor( selector );
+                    key.cancel();
+                    selector.wakeup(); // wake up again to trigger thread death
+                    ch.close();
+                }
+            }
+            catch( Throwable t )
+            {
+                exceptionMonitor.exceptionCaught( this, t );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+                    request.notify();
+                }
+            }
+        }
+    }
+    
+    /**
+     * This method just returns dummy configuration instance because datagram
+     * transport type (UDP/IP) doesn't provide any configuration.
+     */
+    public IoSessionManagerConfig getConfig()
+    {
+        return new BaseIoAcceptorConfig();
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+
+    private static class RegistrationRequest
+    {
+        private final SocketAddress address;
+        
+        private final IoHandler handler;
+        
+        private Throwable exception; 
+        
+        private boolean done;
+        
+        private RegistrationRequest( SocketAddress address, IoHandler handler )
+        {
+            this.address = address;
+            this.handler = handler;
+        }
+    }
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+        private boolean done;
+        private RuntimeException exception;
+        
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}

Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramConnectorImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,503 @@
+/*
+ *   @(#) $Id: DatagramConnector.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.DatagramChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoFilterChain;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoSessionManagerConfig;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.support.BaseIoConnectorConfig;
+import org.apache.mina.common.support.BaseIoSessionManager;
+import org.apache.mina.common.support.IoSessionManagerFilterChain;
+import org.apache.mina.util.Queue;
+
+/**
+ * {@link IoConnector} for datagram transport (UDP/IP).
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (수, 19 10 2005) $
+ */
+public class DatagramConnectorImpl extends BaseIoSessionManager implements IoConnector, DatagramSessionManager
+{
+    private static volatile int nextId = 0;
+
+    private final IoSessionManagerFilterChain filters =
+        new DatagramSessionManagerFilterChain( this );
+
+    private final int id = nextId ++ ;
+
+    private Selector selector;
+
+    private final Queue registerQueue = new Queue();
+
+    private final Queue cancelQueue = new Queue();
+
+    private final Queue flushingSessions = new Queue();
+
+    private Worker worker;
+
+    /**
+     * Creates a new instance.
+     */
+    public DatagramConnectorImpl()
+    {
+    }
+
+    public ConnectFuture connect( SocketAddress address, IoHandler handler ) throws IOException
+    {
+        return connect( address, null, handler);
+    }
+
+    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+                              IoHandler handler ) throws IOException
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( !( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+        
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+        }
+        
+        DatagramChannel ch = DatagramChannel.open();
+        boolean initialized = false;
+        try
+        {
+            ch.socket().setReuseAddress( true );
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+            ch.connect( address );
+            ch.configureBlocking( false );
+            initialized = true;
+        }
+        finally
+        {
+            if( !initialized )
+            {
+                ch.close();
+            }
+        }
+
+        RegistrationRequest request = new RegistrationRequest( ch, handler );
+        synchronized( this )
+        {
+            synchronized( registerQueue )
+            {
+                registerQueue.push( request );
+            }
+            startupWorker();
+        }
+
+        selector.wakeup();
+        return request;
+    }
+    
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    public void closeSession( DatagramSession session )
+    {
+        synchronized( this )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                // IOException is thrown only when Worker thread is not
+                // running and failed to open a selector.  We simply return
+                // silently here because it we can simply conclude that
+                // this session is not managed by this connector or
+                // already closed.
+                return;
+            }
+
+            synchronized( cancelQueue )
+            {
+                cancelQueue.push( session );
+            }
+        }
+
+        selector.wakeup();
+    }
+
+    public void flushSession( DatagramSession session )
+    {
+        scheduleFlush( session );
+        selector.wakeup();
+    }
+
+    private void scheduleFlush( DatagramSession session )
+    {
+        synchronized( flushingSessions )
+        {
+            flushingSessions.push( session );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        public Worker()
+        {
+            super( "DatagramConnector-" + id );
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processReadySessions( selector.selectedKeys() );
+                    }
+
+                    flushSessions();
+                    cancelKeys();
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( DatagramConnectorImpl.this )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    exceptionMonitor.exceptionCaught( DatagramConnectorImpl.this, e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( DatagramConnectorImpl.this,
+                            e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                    }
+                }
+            }
+        }
+    }
+
+    private void processReadySessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+            it.remove();
+
+            DatagramSession session = ( DatagramSession ) key.attachment();
+
+            if( key.isReadable() )
+            {
+                readSession( session );
+            }
+
+            if( key.isWritable() )
+            {
+                scheduleFlush( session );
+            }
+        }
+    }
+
+    private void readSession( DatagramSession session )
+    {
+
+        ByteBuffer readBuf = ByteBuffer.allocate( 2048 );
+        try
+        {
+            int readBytes = session.getChannel().read( readBuf.buf() );
+            if( readBytes > 0 )
+            {
+                readBuf.flip();
+                ByteBuffer newBuf = ByteBuffer.allocate( readBuf.limit() );
+                newBuf.put( readBuf );
+                newBuf.flip();
+
+                session.increaseReadBytes( readBytes );
+                filters.messageReceived( session, newBuf );
+            }
+        }
+        catch( IOException e )
+        {
+            filters.exceptionCaught( session, e );
+        }
+        finally
+        {
+            readBuf.release();
+        }
+    }
+
+    private void flushSessions()
+    {
+        if( flushingSessions.size() == 0 )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+
+            synchronized( flushingSessions )
+            {
+                session = ( DatagramSession ) flushingSessions.pop();
+            }
+
+            if( session == null )
+                break;
+
+            try
+            {
+                flush( session );
+            }
+            catch( IOException e )
+            {
+                session.getManagerFilterChain().exceptionCaught( session, e );
+            }
+        }
+    }
+
+    private void flush( DatagramSession session ) throws IOException
+    {
+        DatagramChannel ch = session.getChannel();
+
+        Queue writeRequestQueue = session.getWriteRequestQueue();
+
+        WriteRequest req;
+        for( ;; )
+        {
+            synchronized( writeRequestQueue )
+            {
+                req = ( WriteRequest ) writeRequestQueue.first();
+            }
+
+            if( req == null )
+                break;
+
+            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
+            if( buf.remaining() == 0 )
+            {
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                session.getManagerFilterChain().messageSent( session, buf );
+                continue;
+            }
+
+            int pos = buf.position();
+            int writtenBytes = ch.write( buf.buf() );
+
+            SelectionKey key = session.getSelectionKey();
+            if( writtenBytes == 0 )
+            {
+                // Kernel buffer is full
+                key.interestOps( key.interestOps() | SelectionKey.OP_WRITE );
+            }
+            else if( writtenBytes > 0 )
+            {
+                key.interestOps( key.interestOps()
+                                 & ( ~SelectionKey.OP_WRITE ) );
+
+                // pop and fire event
+                synchronized( writeRequestQueue )
+                {
+                    writeRequestQueue.pop();
+                }
+
+                session.increaseWrittenBytes( writtenBytes );
+                req.getFuture().setWritten( true );
+                session.increaseWrittenWriteRequests();
+                session.getManagerFilterChain().messageSent( session, buf.position( pos ) );
+            }
+        }
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            RegistrationRequest req;
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            DatagramSession session = new DatagramSession(
+                    filters, req.channel, req.handler );
+
+            boolean success = false;
+            try
+            {
+                req.handler.sessionCreated( session );
+
+                SelectionKey key = req.channel.register( selector,
+                        SelectionKey.OP_READ, session );
+    
+                session.setSelectionKey( key );
+
+                req.setSession( session );
+                success = true;
+            }
+            catch( Throwable t )
+            {
+                req.setException( t );
+            }
+            finally 
+            {
+                if( !success )
+                {
+                    try
+                    {
+                        req.channel.close();
+                    }
+                    catch (IOException e)
+                    {
+                        exceptionMonitor.exceptionCaught( this, e );
+                    }
+                }
+            }
+        }
+    }
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+            return;
+
+        for( ;; )
+        {
+            DatagramSession session;
+            synchronized( cancelQueue )
+            {
+                session = ( DatagramSession ) cancelQueue.pop();
+            }
+
+            if( session == null )
+                break;
+            else
+            {
+                SelectionKey key = session.getSelectionKey();
+                DatagramChannel ch = ( DatagramChannel ) key.channel();
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    exceptionMonitor.exceptionCaught( this, e );
+                }
+                session.notifyClose();
+                key.cancel();
+                selector.wakeup(); // wake up again to trigger thread death
+            }
+        }
+    }
+
+    public IoFilterChain getFilterChain()
+    {
+        return filters;
+    }
+
+    private static class RegistrationRequest extends ConnectFuture
+    {
+        private final DatagramChannel channel;
+
+        private final IoHandler handler;
+
+        private RegistrationRequest( DatagramChannel channel,
+                                     IoHandler handler )
+        {
+            this.channel = channel;
+            this.handler = handler;
+        }
+    }
+
+    /**
+     * This method just returns dummy configuration instance because datagram
+     * transport type (UDP/IP) doesn't provide any configuration.
+     */
+    public IoSessionManagerConfig getConfig()
+    {
+        return new BaseIoConnectorConfig();
+    }
+}

Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSession.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSession.java Wed Oct 19 01:56:19 2005
@@ -16,7 +16,7 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
 
 import java.net.SocketAddress;
 import java.nio.channels.DatagramChannel;
@@ -50,7 +50,7 @@
 
     private final DatagramChannel ch;
 
-    private final DatagramSessionConfig config;
+    private final DatagramSessionConfigImpl config;
 
     private final Queue writeRequestQueue;
 
@@ -73,7 +73,7 @@
         this.managerFilterChain = managerFilterChain;
         this.filterChain = new IoSessionFilterChain( this, managerFilterChain );
         this.ch = ch;
-        this.config = new DatagramSessionConfig( ch );
+        this.config = new DatagramSessionConfigImpl( ch );
         this.writeRequestQueue = new Queue();
         this.handler = defaultHandler;
         this.remoteAddress = ch.socket().getRemoteSocketAddress();
@@ -110,7 +110,7 @@
         return handler;
     }
     
-    public void notifyClose()
+    void notifyClose()
     {
         closeFuture.setClosed( true );
     }
@@ -120,9 +120,9 @@
         if( !closeFuture.isReady() )
         {
             IoSessionManager manager = managerFilterChain.getManager();
-            if( manager instanceof DatagramConnector )
+            if( manager instanceof DatagramConnectorImpl )
             {
-                ( ( DatagramConnector ) manager ).closeSession( this );
+                ( ( DatagramConnectorImpl ) manager ).closeSession( this );
             }
             else
             {

Added: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java?rev=326489&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionConfigImpl.java Wed Oct 19 01:56:19 2005
@@ -0,0 +1,68 @@
+/*
+ *   @(#) $Id: DatagramSessionConfig.java 326451 2005-10-19 08:07:09Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio.support;
+
+import java.net.SocketException;
+import java.nio.channels.DatagramChannel;
+
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoSessionConfig;
+import org.apache.mina.common.support.BaseIoSessionConfig;
+import org.apache.mina.transport.socket.nio.DatagramAcceptor;
+import org.apache.mina.transport.socket.nio.DatagramConnector;
+import org.apache.mina.transport.socket.nio.DatagramSessionConfig;
+
+/**
+ * A {@link IoSessionConfig} for datagram transport (UDP/IP).
+ * You can downcast {@link IoSessionConfig} instance returned by
+ * {@link IoSession#getConfig()} if you've created datagram session using
+ * {@link DatagramAcceptor} or {@link DatagramConnector}.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev: 326451 $, $Date: 2005-10-19 17:07:09 +0900 (수, 19 10 2005) $,
+ */
+class DatagramSessionConfigImpl extends BaseIoSessionConfig implements DatagramSessionConfig
+{
+    private final DatagramChannel ch;
+
+    DatagramSessionConfigImpl( DatagramChannel ch )
+    {
+        this.ch = ch;
+    }
+
+    public boolean getReuseAddress() throws SocketException
+    {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress( boolean on ) throws SocketException
+    {
+        ch.socket().setReuseAddress( on );
+    }
+
+    public int getTrafficClass() throws SocketException
+    {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass( int tc ) throws SocketException
+    {
+        ch.socket().setTrafficClass( tc );
+    }
+}
\ No newline at end of file

Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManager.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManager.java Wed Oct 19 01:56:19 2005
@@ -16,29 +16,27 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
 
-import org.apache.mina.common.support.BaseIoSessionManager;
+import org.apache.mina.common.IoSessionManager;
 
 /**
- * A base class for {@link DatagramAcceptor} and {@link DatagramConnector}.
- * Session interacts with this abstract class instead of those two concrete
- * classes.
+ * A base interface for {@link DatagramAcceptorImpl} and {@link DatagramConnectorImpl}.
  * 
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
  */
-abstract class DatagramSessionManager extends BaseIoSessionManager
+interface DatagramSessionManager extends IoSessionManager
 {
     /**
      * Requests this processor to flush the write buffer of the specified
      * session.  This method is invoked by MINA internally.
      */
-    abstract void flushSession( DatagramSession session );
+    void flushSession( DatagramSession session );
 
     /**
      * Requests this processor to close the specified session.
      * This method is invoked by MINA internally.
      */
-    abstract void closeSession( DatagramSession session );
+    void closeSession( DatagramSession session );
 }

Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java (from r326451, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java&r1=326451&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/DatagramSessionManagerFilterChain.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/DatagramSessionManagerFilterChain.java Wed Oct 19 01:56:19 2005
@@ -1,4 +1,4 @@
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
 
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;

Copied: directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (from r326410, directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java)
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java?p2=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java&p1=directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java&r1=326410&r2=326489&rev=326489&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java Wed Oct 19 01:56:19 2005
@@ -16,7 +16,7 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio;
+package org.apache.mina.transport.socket.nio.support;
 
 import java.io.IOException;
 import java.nio.channels.CancelledKeyException;
@@ -31,6 +31,7 @@
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.WriteTimeoutException;
 import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.Queue;
 
 /**
@@ -40,7 +41,7 @@
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$,
  */
-class SocketIoProcessor
+public class SocketIoProcessor
 {
     private static final SocketIoProcessor instance;
 
@@ -82,12 +83,12 @@
         selector = Selector.open();
     }
 
-    static SocketIoProcessor getInstance()
+    public static SocketIoProcessor getInstance()
     {
         return instance;
     }
 
-    void addSession( SocketSession session )
+    public void addSession( SocketSession session )
     {
         synchronized( this )
         {