You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by pr...@apache.org on 2006/04/29 01:36:29 UTC

svn commit: r398039 - in /directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio: ./ support/

Author: proyal
Date: Fri Apr 28 16:36:27 2006
New Revision: 398039

URL: http://svn.apache.org/viewcvs?rev=398039&view=rev
Log:
DIRMINA-209 - The number of processors for socket connectors and acceptors can be specified via a cxtor parameter. By default, connectors and acceptors each use their own processor.
Additionally, the delegates have been removed helping to simplify the code.

Added:
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java   (contents, props changed)
      - copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java   (contents, props changed)
      - copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java   (contents, props changed)
      - copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java   (contents, props changed)
      - copied, changed from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
Removed:
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketAcceptorDelegate.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketConnectorDelegate.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java
Modified:
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java
    directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java?rev=398039&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptor.java Fri Apr 28 16:36:27 2006
@@ -0,0 +1,651 @@
+/*
+ *   @(#) $Id: SocketAcceptor.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoAcceptorConfig;
+import org.apache.mina.common.IoFuture;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.support.BaseIoAcceptor;
+import org.apache.mina.util.IdentityHashSet;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * {@link IoAcceptor} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
+ */
+public class SocketAcceptor extends BaseIoAcceptor
+{
+    /**
+     * @noinspection StaticNonFinalField
+     */
+    private static volatile int nextId = 0;
+
+    private final Object lock = new Object();
+    private final int id = nextId ++;
+    private final String threadName = "SocketAcceptor-" + id;
+    private final IoServiceConfig defaultConfig = new SocketAcceptorConfig();
+    private final Map channels = new HashMap();
+    private final Hashtable sessions = new Hashtable();
+
+    private final Queue registerQueue = new Queue();
+    private final Queue cancelQueue = new Queue();
+
+    private final SocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+
+    /**
+     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+     */
+    private Selector selector;
+    private Worker worker;
+    private int processorDistributor = 0;
+
+    /**
+     * Create an acceptor with a single processing thread
+     */
+    public SocketAcceptor()
+    {
+        this( 1 );
+    }
+
+    /**
+     * Create an acceptor with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     */
+    public SocketAcceptor( int processorCount )
+    {
+        if( processorCount < 1 )
+        {
+            throw new IllegalArgumentException( "Must have at least one processor" );
+        }
+
+        this.processorCount = processorCount;
+        ioProcessors = new SocketIoProcessor[processorCount];
+
+        for( int i = 0; i < processorCount; i++ )
+        {
+            ioProcessors[i] = new SocketIoProcessor( "SocketAcceptorIoProcessor-" + id + "." + i );
+        }
+    }
+
+    /**
+     * Binds to the specified <code>address</code> and handles incoming connections with the specified
+     * <code>handler</code>.  Backlog value is configured to the value of <code>backlog</code> property.
+     *
+     * @throws IOException if failed to bind
+     */
+    public void bind( SocketAddress address, IoHandler handler, IoServiceConfig config ) throws IOException
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        if( handler == null )
+        {
+            throw new NullPointerException( "handler" );
+        }
+
+        if( !( address instanceof InetSocketAddress ) )
+        {
+            throw new IllegalArgumentException( "Unexpected address type: " + address.getClass() );
+        }
+
+        if( ( ( InetSocketAddress ) address ).getPort() == 0 )
+        {
+            throw new IllegalArgumentException( "Unsupported port number: 0" );
+        }
+
+        if( config == null )
+        {
+            config = getDefaultConfig();
+        }
+
+        RegistrationRequest request = new RegistrationRequest( address, handler, config );
+
+        synchronized( registerQueue )
+        {
+            registerQueue.push( request );
+        }
+
+        startupWorker();
+
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        if( request.exception != null )
+        {
+            throw request.exception;
+        }
+    }
+
+
+    private synchronized void startupWorker() throws IOException
+    {
+        synchronized( lock )
+        {
+            if( worker == null )
+            {
+                selector = Selector.open();
+                worker = new Worker();
+
+                worker.start();
+            }
+        }
+    }
+
+    public Set getManagedSessions( SocketAddress address )
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        Set managedSessions = ( Set ) sessions.get( address );
+
+        if( managedSessions == null )
+        {
+            throw new IllegalArgumentException( "Address not bound: " + address );
+        }
+
+        return Collections.unmodifiableSet(
+            new IdentityHashSet( Arrays.asList( managedSessions.toArray() ) ) );
+    }
+
+    public void unbind( SocketAddress address )
+    {
+        if( address == null )
+        {
+            throw new NullPointerException( "address" );
+        }
+
+        final Set managedSessions = ( Set ) sessions.get( address );
+        CancellationRequest request = new CancellationRequest( address );
+
+        try
+        {
+            startupWorker();
+        }
+        catch( IOException e )
+        {
+            // IOException is thrown only when Worker thread is not
+            // running and failed to open a selector.  We simply throw
+            // IllegalArgumentException here because we can simply
+            // conclude that nothing is bound to the selector.
+            throw new IllegalArgumentException( "Address not bound: " + address );
+        }
+
+        synchronized( cancelQueue )
+        {
+            cancelQueue.push( request );
+        }
+
+        selector.wakeup();
+
+        synchronized( request )
+        {
+            while( !request.done )
+            {
+                try
+                {
+                    request.wait();
+                }
+                catch( InterruptedException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        if( request.exception != null )
+        {
+            request.exception.fillInStackTrace();
+
+            throw request.exception;
+        }
+
+        // Disconnect all clients
+        IoServiceConfig cfg = request.registrationRequest.config;
+        boolean disconnectOnUnbind;
+        if( cfg instanceof IoAcceptorConfig )
+        {
+            disconnectOnUnbind = ( ( IoAcceptorConfig ) cfg ).isDisconnectOnUnbind();
+        }
+        else
+        {
+            disconnectOnUnbind = ( ( IoAcceptorConfig ) getDefaultConfig() ).isDisconnectOnUnbind();
+        }
+
+        if( disconnectOnUnbind && managedSessions != null )
+        {
+            IoSession[] tempSessions =
+                ( IoSession[] ) managedSessions.toArray( new IoSession[ managedSessions.size() ] );
+
+            final Object lock = new Object();
+
+            for( int i = 0; i < tempSessions.length; i++ )
+            {
+                if( !managedSessions.contains( tempSessions[i] ) )
+                {
+                    // The session has already been closed and have been 
+                    // removed from managedSessions by the SocketIoProcessor.
+                    continue;
+                }
+                tempSessions[i].close().setCallback( new IoFuture.Callback()
+                {
+                    public void operationComplete( IoFuture future )
+                    {
+                        synchronized( lock )
+                        {
+                            //noinspection NakedNotify
+                            lock.notifyAll();
+                        }
+                    }
+                } );
+            }
+
+            try
+            {
+                synchronized( lock )
+                {
+                    while( !managedSessions.isEmpty() )
+                    {
+                        lock.wait( 1000 );
+                    }
+                }
+            }
+            catch( InterruptedException ie )
+            {
+                // Ignored
+            }
+
+        }
+    }
+
+    public void unbindAll()
+    {
+        List addresses;
+        synchronized( channels )
+        {
+            addresses = new ArrayList( channels.keySet() );
+        }
+
+        for( Iterator i = addresses.iterator(); i.hasNext(); )
+        {
+            unbind( ( SocketAddress ) i.next() );
+        }
+    }
+
+    public boolean isBound( SocketAddress address )
+    {
+        synchronized( channels )
+        {
+            return channels.containsKey( address );
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        Worker()
+        {
+            super( SocketAcceptor.this.threadName );
+        }
+
+        public void run()
+        {
+            for( ; ; )
+            {
+                try
+                {
+                    int nKeys = selector.select();
+
+                    registerNew();
+                    cancelKeys();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        synchronized( lock )
+                        {
+                            if( selector.keys().isEmpty() &&
+                                registerQueue.isEmpty() &&
+                                cancelQueue.isEmpty() )
+                            {
+                                worker = null;
+                                try
+                                {
+                                    selector.close();
+                                }
+                                catch( IOException e )
+                                {
+                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                }
+                                finally
+                                {
+                                    selector = null;
+                                }
+                                break;
+                            }
+                        }
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                    }
+                }
+            }
+        }
+
+        private void processSessions( Set keys ) throws IOException
+        {
+            Iterator it = keys.iterator();
+            while( it.hasNext() )
+            {
+                SelectionKey key = ( SelectionKey ) it.next();
+
+                it.remove();
+
+                if( !key.isAcceptable() )
+                {
+                    continue;
+                }
+
+                ServerSocketChannel ssc = ( ServerSocketChannel ) key.channel();
+
+                SocketChannel ch = ssc.accept();
+
+                if( ch == null )
+                {
+                    continue;
+                }
+
+                boolean success = false;
+                SocketSessionImpl session = null;
+                try
+                {
+                    RegistrationRequest req = ( RegistrationRequest ) key.attachment();
+                    session = new SocketSessionImpl( SocketAcceptor.this,
+                                                     nextProcessor(),
+                                                     ( Set ) sessions.get( req.address ),
+                                                     req.config.getSessionConfig(),
+                                                     ch,
+                                                     req.handler,
+                                                     req.address );
+                    getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+                    req.config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+                    req.config.getThreadModel().buildFilterChain( session.getFilterChain() );
+                    ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+                    session.getManagedSessions().add( session );
+                    session.getIoProcessor().addNew( session );
+                    success = true;
+                }
+                catch( Throwable t )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( t );
+                }
+                finally
+                {
+                    if( !success )
+                    {
+                        if( session != null )
+                        {
+                            session.getManagedSessions().remove( session );
+                        }
+                        ch.close();
+                    }
+                }
+            }
+        }
+    }
+
+    private SocketIoProcessor nextProcessor()
+    {
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+
+    public IoServiceConfig getDefaultConfig()
+    {
+        return defaultConfig;
+    }
+
+    private void registerNew()
+    {
+        if( registerQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            RegistrationRequest req;
+
+            synchronized( registerQueue )
+            {
+                req = ( RegistrationRequest ) registerQueue.pop();
+            }
+
+            if( req == null )
+            {
+                break;
+            }
+
+            ServerSocketChannel ssc = null;
+
+            try
+            {
+                ssc = ServerSocketChannel.open();
+                ssc.configureBlocking( false );
+
+                // Configure the server socket,
+                SocketAcceptorConfig cfg;
+                if( req.config instanceof SocketAcceptorConfig )
+                {
+                    cfg = ( SocketAcceptorConfig ) req.config;
+                }
+                else
+                {
+                    cfg = ( SocketAcceptorConfig ) getDefaultConfig();
+                }
+
+                ssc.socket().setReuseAddress( cfg.isReuseAddress() );
+                ssc.socket().setReceiveBufferSize(
+                    ( ( SocketSessionConfig ) cfg.getSessionConfig() ).getReceiveBufferSize() );
+
+                // and bind.
+                ssc.socket().bind( req.address, cfg.getBacklog() );
+                ssc.register( selector, SelectionKey.OP_ACCEPT, req );
+
+                synchronized( channels )
+                {
+                    channels.put( req.address, ssc );
+                }
+                sessions.put( req.address, Collections.synchronizedSet( new HashSet() ) );
+            }
+            catch( IOException e )
+            {
+                req.exception = e;
+            }
+            finally
+            {
+                synchronized( req )
+                {
+                    req.done = true;
+
+                    req.notifyAll();
+                }
+
+                if( ssc != null && req.exception != null )
+                {
+                    try
+                    {
+                        ssc.close();
+                    }
+                    catch( IOException e )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+    }
+
+
+    private void cancelKeys()
+    {
+        if( cancelQueue.isEmpty() )
+        {
+            return;
+        }
+
+        for( ; ; )
+        {
+            CancellationRequest request;
+
+            synchronized( cancelQueue )
+            {
+                request = ( CancellationRequest ) cancelQueue.pop();
+            }
+
+            if( request == null )
+            {
+                break;
+            }
+
+            sessions.remove( request.address );
+            ServerSocketChannel ssc;
+            synchronized( channels )
+            {
+                ssc = ( ServerSocketChannel ) channels.remove( request.address );
+            }
+
+            // close the channel
+            try
+            {
+                if( ssc == null )
+                {
+                    request.exception = new IllegalArgumentException( "Address not bound: " + request.address );
+                }
+                else
+                {
+                    SelectionKey key = ssc.keyFor( selector );
+                    request.registrationRequest = ( RegistrationRequest ) key.attachment();
+                    key.cancel();
+
+                    selector.wakeup(); // wake up again to trigger thread death
+
+                    ssc.close();
+                }
+            }
+            catch( IOException e )
+            {
+                ExceptionMonitor.getInstance().exceptionCaught( e );
+            }
+            finally
+            {
+                synchronized( request )
+                {
+                    request.done = true;
+                    request.notifyAll();
+                }
+            }
+        }
+    }
+
+    private static class RegistrationRequest
+    {
+        private final SocketAddress address;
+        private final IoHandler handler;
+        private final IoServiceConfig config;
+        private IOException exception;
+        private boolean done;
+
+        private RegistrationRequest( SocketAddress address, IoHandler handler, IoServiceConfig config )
+        {
+            this.address = address;
+            this.handler = handler;
+            this.config = config;
+        }
+    }
+
+
+    private static class CancellationRequest
+    {
+        private final SocketAddress address;
+        private boolean done;
+        private RegistrationRequest registrationRequest;
+        private RuntimeException exception;
+
+        private CancellationRequest( SocketAddress address )
+        {
+            this.address = address;
+        }
+    }
+}

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketAcceptorConfig.java Fri Apr 28 16:36:27 2006
@@ -18,15 +18,14 @@
  */
 package org.apache.mina.transport.socket.nio;
 
-import java.io.IOException;
-import java.net.ServerSocket;
-
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoAcceptorConfig;
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.support.BaseIoAcceptorConfig;
-import org.apache.mina.transport.socket.nio.support.SocketSessionConfigImpl;
+
+import java.io.IOException;
+import java.net.ServerSocket;
 
 /**
  * An {@link IoAcceptorConfig} for {@link SocketAcceptor}.
@@ -39,7 +38,7 @@
     private IoSessionConfig sessionConfig = new SocketSessionConfigImpl();
     private int backlog = 50;
     private boolean reuseAddress;
-    
+
     /**
      * Creates a new instance.
      * 
@@ -85,7 +84,7 @@
     {
         return reuseAddress;
     }
-    
+
     /**
      * @see ServerSocket#setReuseAddress(boolean)
      */
@@ -93,12 +92,12 @@
     {
         this.reuseAddress = reuseAddress;
     }
-    
+
     public int getBacklog()
     {
         return backlog;
     }
-    
+
     public void setBacklog( int backlog )
     {
         this.backlog = backlog;

Added: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java?rev=398039&view=auto
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java (added)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnector.java Fri Apr 28 16:36:27 2006
@@ -0,0 +1,466 @@
+/*
+ *   @(#) $Id: SocketConnector.java 389042 2006-03-27 07:49:41Z trustin $
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ConnectFuture;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IoConnector;
+import org.apache.mina.common.IoConnectorConfig;
+import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.common.support.BaseIoConnector;
+import org.apache.mina.util.Queue;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link IoConnector} for socket transport (TCP/IP).
+ *
+ * @author The Apache Directory Project (mina-dev@directory.apache.org)
+ * @version $Rev: 389042 $, $Date: 2006-03-27 07:49:41Z $
+ */
+public class SocketConnector extends BaseIoConnector
+{
+    /**
+     * @noinspection StaticNonFinalField
+     */
+    private static volatile int nextId = 0;
+
+    private final Object lock = new Object();
+    private final int id = nextId++;
+    private final String threadName = "SocketConnector-" + id;
+    private final IoServiceConfig defaultConfig = new SocketConnectorConfig();
+    private final Queue connectQueue = new Queue();
+    private final Set managedSessions = Collections.synchronizedSet( new HashSet() );
+    private final SocketIoProcessor[] ioProcessors;
+    private final int processorCount;
+
+    /**
+     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
+     */
+    private Selector selector;
+    private Worker worker;
+    private int processorDistributor = 0;
+    private long workerTimeout = 1000L * 60;
+
+    /**
+     * Create a connector with a single processing thread
+     */
+    public SocketConnector()
+    {
+        this( 1 );
+    }
+
+    /**
+     * Create a connector with the desired number of processing threads
+     *
+     * @param processorCount Number of processing threads
+     */
+    public SocketConnector( int processorCount )
+    {
+        if( processorCount < 1 )
+        {
+            throw new IllegalArgumentException( "Must have at least one processor" );
+        }
+
+        this.processorCount = processorCount;
+        ioProcessors = new SocketIoProcessor[processorCount];
+
+        for( int i = 0; i < processorCount; i++ )
+        {
+            ioProcessors[i] = new SocketIoProcessor( "SocketConnectorIoProcessor-" + id + "." + i );
+        }
+    }
+
+    /**
+     * How many milliseconds to keep the connection thread alive between connection requests
+     *
+     * @return Number of milliseconds to keep connection thread alive
+     */
+    public long getWorkerTimeout()
+    {
+        return workerTimeout;
+    }
+
+    /**
+     * Set how many milliseconds the connection worker thread should remain alive once idle before terminating itself.
+     *
+     * @param workerTimeout Number of milliseconds to keep thread alive. Must be >=0
+     */
+    public void setWorkerTimeout( long workerTimeout )
+    {
+        if( workerTimeout < 0 )
+        {
+            throw new IllegalArgumentException( "Must be > 0" );
+        }
+        this.workerTimeout = workerTimeout;
+    }
+
+    public ConnectFuture connect( SocketAddress address, IoHandler handler, IoServiceConfig config )
+    {
+        return connect( address, null, handler, config );
+    }
+
+    public ConnectFuture connect( SocketAddress address, SocketAddress localAddress,
+                                  IoHandler handler, IoServiceConfig config )
+    {
+        if( address == null )
+            throw new NullPointerException( "address" );
+        if( handler == null )
+            throw new NullPointerException( "handler" );
+
+        if( ! ( address instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected address type: "
+                                                + address.getClass() );
+
+        if( localAddress != null && !( localAddress instanceof InetSocketAddress ) )
+            throw new IllegalArgumentException( "Unexpected local address type: "
+                                                + localAddress.getClass() );
+
+        if( config == null )
+        {
+            config = getDefaultConfig();
+        }
+
+        SocketChannel ch = null;
+        boolean success = false;
+        try
+        {
+            ch = SocketChannel.open();
+            ch.socket().setReuseAddress( true );
+            if( localAddress != null )
+            {
+                ch.socket().bind( localAddress );
+            }
+
+            ch.configureBlocking( false );
+
+            if( ch.connect( address ) )
+            {
+                SocketSessionImpl session = newSession( ch, handler, config );
+                success = true;
+                ConnectFuture future = new ConnectFuture();
+                future.setSession( session );
+                return future;
+            }
+
+            success = true;
+        }
+        catch( IOException e )
+        {
+            return ConnectFuture.newFailedFuture( e );
+        }
+        finally
+        {
+            if( !success && ch != null )
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                }
+            }
+        }
+
+        ConnectionRequest request = new ConnectionRequest( ch, handler, config );
+        synchronized( lock )
+        {
+            try
+            {
+                startupWorker();
+            }
+            catch( IOException e )
+            {
+                try
+                {
+                    ch.close();
+                }
+                catch( IOException e2 )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e2 );
+                }
+
+                return ConnectFuture.newFailedFuture( e );
+            }
+        }
+
+        synchronized( connectQueue )
+        {
+            connectQueue.push( request );
+        }
+        selector.wakeup();
+
+        return request;
+    }
+
+    public IoServiceConfig getDefaultConfig()
+    {
+        return defaultConfig;
+    }
+
+    private synchronized void startupWorker() throws IOException
+    {
+        if( worker == null )
+        {
+            selector = Selector.open();
+            worker = new Worker();
+            worker.start();
+        }
+    }
+
+    private void registerNew()
+    {
+        if( connectQueue.isEmpty() )
+            return;
+
+        for( ; ; )
+        {
+            ConnectionRequest req;
+            synchronized( connectQueue )
+            {
+                req = ( ConnectionRequest ) connectQueue.pop();
+            }
+
+            if( req == null )
+                break;
+
+            SocketChannel ch = req.channel;
+            try
+            {
+                ch.register( selector, SelectionKey.OP_CONNECT, req );
+            }
+            catch( IOException e )
+            {
+                req.setException( e );
+            }
+        }
+    }
+
+    private void processSessions( Set keys )
+    {
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isConnectable() )
+                continue;
+
+            SocketChannel ch = ( SocketChannel ) key.channel();
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            boolean success = false;
+            try
+            {
+                ch.finishConnect();
+                SocketSessionImpl session = newSession( ch, entry.handler, entry.config );
+                entry.setSession( session );
+                success = true;
+            }
+            catch( Throwable e )
+            {
+                entry.setException( e );
+            }
+            finally
+            {
+                key.cancel();
+                if( !success )
+                {
+                    try
+                    {
+                        ch.close();
+                    }
+                    catch( IOException e )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                    }
+                }
+            }
+        }
+
+        keys.clear();
+    }
+
+    private void processTimedOutSessions( Set keys )
+    {
+        long currentTime = System.currentTimeMillis();
+        Iterator it = keys.iterator();
+
+        while( it.hasNext() )
+        {
+            SelectionKey key = ( SelectionKey ) it.next();
+
+            if( !key.isValid() )
+                continue;
+
+            ConnectionRequest entry = ( ConnectionRequest ) key.attachment();
+
+            if( currentTime >= entry.deadline )
+            {
+                entry.setException( new ConnectException() );
+                key.cancel();
+            }
+        }
+    }
+
+    private SocketSessionImpl newSession( SocketChannel ch, IoHandler handler, IoServiceConfig config )
+        throws IOException
+    {
+        SocketSessionImpl session = new SocketSessionImpl( this,
+                                                           nextProcessor(),
+                                                           managedSessions,
+                                                           config.getSessionConfig(),
+                                                           ch,
+                                                           handler,
+                                                           ch.socket().getRemoteSocketAddress() );
+        try
+        {
+            getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+            config.getFilterChainBuilder().buildFilterChain( session.getFilterChain() );
+            config.getThreadModel().buildFilterChain( session.getFilterChain() );
+            ( ( SocketFilterChain ) session.getFilterChain() ).sessionCreated( session );
+        }
+        catch( Throwable e )
+        {
+            throw ( IOException ) new IOException( "Failed to create a session." ).initCause( e );
+        }
+        session.getManagedSessions().add( session );
+        session.getIoProcessor().addNew( session );
+        return session;
+    }
+
+    private SocketIoProcessor nextProcessor()
+    {
+        return ioProcessors[processorDistributor++ % processorCount];
+    }
+
+    private class Worker extends Thread
+    {
+        private long lastActive = System.currentTimeMillis();
+
+        Worker()
+        {
+            super( SocketConnector.this.threadName );
+        }
+
+        public void run()
+        {
+            for( ; ; )
+            {
+                try
+                {
+                    int nKeys = selector.select( 1000 );
+
+                    registerNew();
+
+                    if( nKeys > 0 )
+                    {
+                        processSessions( selector.selectedKeys() );
+                    }
+
+                    processTimedOutSessions( selector.keys() );
+
+                    if( selector.keys().isEmpty() )
+                    {
+                        if( System.currentTimeMillis() - lastActive > workerTimeout )
+                        {
+                            synchronized( lock )
+                            {
+                                if( selector.keys().isEmpty() &&
+                                    connectQueue.isEmpty() )
+                                {
+                                    worker = null;
+                                    try
+                                    {
+                                        selector.close();
+                                    }
+                                    catch( IOException e )
+                                    {
+                                        ExceptionMonitor.getInstance().exceptionCaught( e );
+                                    }
+                                    finally
+                                    {
+                                        selector = null;
+                                    }
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                    else
+                    {
+                        lastActive = System.currentTimeMillis();
+                    }
+                }
+                catch( IOException e )
+                {
+                    ExceptionMonitor.getInstance().exceptionCaught( e );
+
+                    try
+                    {
+                        Thread.sleep( 1000 );
+                    }
+                    catch( InterruptedException e1 )
+                    {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                    }
+                }
+            }
+        }
+    }
+
+    private class ConnectionRequest extends ConnectFuture
+    {
+        private final SocketChannel channel;
+        private final long deadline;
+        private final IoHandler handler;
+        private final IoServiceConfig config;
+
+        private ConnectionRequest( SocketChannel channel, IoHandler handler, IoServiceConfig config )
+        {
+            this.channel = channel;
+            long timeout;
+            if( config instanceof IoConnectorConfig )
+            {
+                timeout = ( ( IoConnectorConfig ) config ).getConnectTimeoutMillis();
+            }
+            else
+            {
+                timeout = ( ( IoConnectorConfig ) getDefaultConfig() ).getConnectTimeoutMillis();
+            }
+            this.deadline = System.currentTimeMillis() + timeout;
+            this.handler = handler;
+            this.config = config;
+        }
+    }
+}
\ No newline at end of file

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketConnectorConfig.java Fri Apr 28 16:36:27 2006
@@ -22,7 +22,6 @@
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.support.BaseIoConnectorConfig;
-import org.apache.mina.transport.socket.nio.support.SocketSessionConfigImpl;
 
 /**
  * An {@link IoConnectorConfig} for {@link SocketConnector}.

Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketFilterChain.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java Fri Apr 28 16:36:27 2006
@@ -16,7 +16,7 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
 
 import java.io.IOException;
 
@@ -34,7 +34,7 @@
  */
 class SocketFilterChain extends AbstractIoFilterChain {
 
-    public SocketFilterChain( IoSession parent )
+    SocketFilterChain( IoSession parent )
     {
         super( parent );
     }

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketFilterChain.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketIoProcessor.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java Fri Apr 28 16:36:27 2006
@@ -16,7 +16,14 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio.support;
+package org.apache.mina.transport.socket.nio;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.ExceptionMonitor;
+import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilter.WriteRequest;
+import org.apache.mina.common.WriteTimeoutException;
+import org.apache.mina.util.Queue;
 
 import java.io.IOException;
 import java.nio.channels.SelectionKey;
@@ -25,50 +32,20 @@
 import java.util.Iterator;
 import java.util.Set;
 
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.WriteTimeoutException;
-import org.apache.mina.common.IoFilter.WriteRequest;
-import org.apache.mina.util.Queue;
-
 /**
- * Performs all I/O operations for sockets which is connected or bound.
- * This class is used by MINA internally.
- * 
+ * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally.
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$,
  */
 class SocketIoProcessor
 {
-    private static final String PROCESSORS_PROPERTY = "mina.socket.processors";
-    private static final String THREAD_PREFIX = "SocketIoProcessor-";
-    private static final int DEFAULT_PROCESSORS = 1;
-    private static final int PROCESSOR_COUNT;
-    private static final SocketIoProcessor[] PROCESSORS;
-    
-    private static int nextId;
-    
-    static
-    {
-        PROCESSOR_COUNT = configureProcessorCount(); 
-        PROCESSORS = createProcessors();
-    }
-      
+    private final Object lock = new Object();
+
+    private final String threadName;
     /**
-     * Returns the {@link SocketIoProcessor} to be used for a newly
-     * created session
-     * 
-     * @return  The processor to be employed
+     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
      */
-    static synchronized SocketIoProcessor getInstance()
-    {
-        SocketIoProcessor processor = PROCESSORS[ nextId ++ ];
-        nextId %= PROCESSOR_COUNT;
-        return processor;
-    }
-      
-    private final String threadName;
     private Selector selector;
 
     private final Queue newSessions = new Queue();
@@ -79,22 +56,20 @@
     private Worker worker;
     private long lastIdleCheckTime = System.currentTimeMillis();
 
-    private SocketIoProcessor( String threadName )
+    SocketIoProcessor( String threadName )
     {
         this.threadName = threadName;
     }
 
     void addNew( SocketSessionImpl session ) throws IOException
     {
-        synchronized( this )
+        synchronized( newSessions )
         {
-            synchronized( newSessions )
-            {
-                newSessions.push( session );
-            }
-            startupWorker();
+            newSessions.push( session );
         }
 
+        startupWorker();
+
         selector.wakeup();
     }
 
@@ -105,13 +80,16 @@
         selector.wakeup();
     }
 
-    private synchronized void startupWorker() throws IOException
+    private void startupWorker() throws IOException
     {
-        if( worker == null )
+        synchronized( lock )
         {
-            selector = Selector.open();
-            worker = new Worker();
-            worker.start();
+            if( worker == null )
+            {
+                selector = Selector.open();
+                worker = new Worker();
+                worker.start();
+            }
         }
     }
 
@@ -164,10 +142,10 @@
         if( newSessions.isEmpty() )
             return;
 
-        SocketSessionImpl session;
-
-        for( ;; )
+        for( ; ; )
         {
+            SocketSessionImpl session;
+
             synchronized( newSessions )
             {
                 session = ( SocketSessionImpl ) newSessions.pop();
@@ -206,7 +184,7 @@
         if( removingSessions.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
             SocketSessionImpl session;
 
@@ -277,16 +255,16 @@
 
     private void read( SocketSessionImpl session )
     {
-        ByteBuffer buf = ByteBuffer.allocate( session.getReadBufferSize() ); 
+        ByteBuffer buf = ByteBuffer.allocate( session.getReadBufferSize() );
         SocketChannel ch = session.getChannel();
 
         try
         {
+            buf.clear();
+
             int readBytes = 0;
             int ret;
 
-            buf.clear();
-
             try
             {
                 while( ( ret = ch.read( buf.buf() ) ) > 0 )
@@ -347,28 +325,28 @@
     private void notifyIdleness( SocketSessionImpl session, long currentTime )
     {
         notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
-                IdleStatus.BOTH_IDLE,
-                Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+            session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
+            IdleStatus.BOTH_IDLE,
+            Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
         notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
-                IdleStatus.READER_IDLE,
-                Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+            session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
+            IdleStatus.READER_IDLE,
+            Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
         notifyIdleness0(
-                session, currentTime,
-                session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
-                IdleStatus.WRITER_IDLE,
-                Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+            session, currentTime,
+            session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
+            IdleStatus.WRITER_IDLE,
+            Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
 
         notifyWriteTimeout( session, currentTime, session
-                .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+            .getWriteTimeoutInMillis(), session.getLastWriteTime() );
     }
 
     private void notifyIdleness0( SocketSessionImpl session, long currentTime,
-                                    long idleTime, IdleStatus status,
-                                    long lastIoTime )
+                                  long idleTime, IdleStatus status,
+                                  long lastIoTime )
     {
         if( idleTime > 0 && lastIoTime != 0
             && ( currentTime - lastIoTime ) >= idleTime )
@@ -379,8 +357,8 @@
     }
 
     private void notifyWriteTimeout( SocketSessionImpl session,
-                                           long currentTime,
-                                           long writeTimeout, long lastIoTime )
+                                     long currentTime,
+                                     long writeTimeout, long lastIoTime )
     {
         SelectionKey key = session.getSelectionKey();
         if( writeTimeout > 0
@@ -397,7 +375,7 @@
         if( flushingSessions.size() == 0 )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
             SocketSessionImpl session;
 
@@ -414,7 +392,7 @@
                 releaseWriteBuffers( session );
                 continue;
             }
-            
+
             SelectionKey key = session.getSelectionKey();
             // Retry later if session is not yet fully initialized.
             // (In case that Session.write() is called before addSession() is processed)
@@ -440,12 +418,12 @@
             }
         }
     }
-    
+
     private void releaseWriteBuffers( SocketSessionImpl session )
     {
         Queue writeRequestQueue = session.getWriteRequestQueue();
         WriteRequest req;
-        
+
         while( ( req = ( WriteRequest ) writeRequestQueue.pop() ) != null )
         {
             try
@@ -472,9 +450,10 @@
         SocketChannel ch = session.getChannel();
         Queue writeRequestQueue = session.getWriteRequestQueue();
 
-        WriteRequest req;
-        for( ;; )
+        for( ; ; )
         {
+            WriteRequest req;
+
             synchronized( writeRequestQueue )
             {
                 req = ( WriteRequest ) writeRequestQueue.first();
@@ -490,9 +469,9 @@
                 {
                     writeRequestQueue.pop();
                 }
-                
+
                 session.increaseWrittenWriteRequests();
-                
+
                 buf.reset();
                 ( ( SocketFilterChain ) session.getFilterChain() ).messageSent( session, req );
                 continue;
@@ -513,12 +492,12 @@
         }
     }
 
-    private void doUpdateTrafficMask() 
+    private void doUpdateTrafficMask()
     {
         if( trafficControllingSessions.isEmpty() )
             return;
 
-        for( ;; )
+        for( ; ; )
         {
             SocketSessionImpl session;
 
@@ -562,65 +541,25 @@
             key.interestOps( ops & mask );
         }
     }
-    
-    /**
-     * Configures the number of processors employed.
-     * We first check for a system property "mina.IoProcessors". If this
-     * property is present and can be interpreted as an integer value greater 
-     * or equal to 1, this value is used as the number of processors.
-     * Otherwise a default of 1 processor is employed.
-     * 
-     * @return  The nubmer of processors to employ
-     */
-    private static int configureProcessorCount() 
-    {
-        int processors = DEFAULT_PROCESSORS;
-        String processorProperty = System.getProperty( PROCESSORS_PROPERTY );
-        if ( processorProperty != null ) 
-        {
-            try 
-            {
-                processors = Integer.parseInt( processorProperty );
-            } 
-            catch ( NumberFormatException e )
-            {
-                ExceptionMonitor.getInstance().exceptionCaught( e );
-            }
-            processors = Math.max( processors, 1 );
-            
-            System.setProperty( PROCESSORS_PROPERTY, String.valueOf( processors ) );
-        }
 
-        return processors;
-    }
-    
-    private static SocketIoProcessor[] createProcessors()
-    {
-        SocketIoProcessor[] processors = new SocketIoProcessor[ PROCESSOR_COUNT ];
-        for ( int i = 0; i < PROCESSOR_COUNT; i ++ )
-        {
-            processors[i] = new SocketIoProcessor( THREAD_PREFIX + i );
-        }
-        return processors;
-    }
-    
+
     private class Worker extends Thread
     {
-        public Worker()
+        Worker()
         {
             super( SocketIoProcessor.this.threadName );
         }
 
         public void run()
         {
-            for( ;; )
+            for( ; ; )
             {
                 try
                 {
                     int nKeys = selector.select( 1000 );
                     doAddNew();
                     doUpdateTrafficMask();
-                    
+
                     if( nKeys > 0 )
                     {
                         process( selector.selectedKeys() );
@@ -632,12 +571,12 @@
 
                     if( selector.keys().isEmpty() )
                     {
-                        synchronized( SocketIoProcessor.this )
+                        synchronized( lock )
                         {
-                            if( selector.keys().isEmpty() &&
-                                newSessions.isEmpty() )
+                            if( selector.keys().isEmpty() && newSessions.isEmpty() )
                             {
                                 worker = null;
+
                                 try
                                 {
                                     selector.close();
@@ -650,6 +589,7 @@
                                 {
                                     selector = null;
                                 }
+
                                 break;
                             }
                         }
@@ -665,10 +605,11 @@
                     }
                     catch( InterruptedException e1 )
                     {
+                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
                     }
                 }
             }
         }
     }
-    
+
 }

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketIoProcessor.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionConfigImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java Fri Apr 28 16:36:27 2006
@@ -16,17 +16,15 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketException;
+package org.apache.mina.transport.socket.nio;
 
 import org.apache.mina.common.ExceptionMonitor;
 import org.apache.mina.common.IoConnectorConfig;
 import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketConnector;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketException;
 
 /**
  * An {@link IoConnectorConfig} for {@link SocketConnector}.

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionConfigImpl.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Copied: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java (from r397283, directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java)
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java?p2=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java&p1=directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java&r1=397283&r2=398039&rev=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/support/SocketSessionImpl.java (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java Fri Apr 28 16:36:27 2006
@@ -16,14 +16,9 @@
  *   limitations under the License.
  *
  */
-package org.apache.mina.transport.socket.nio.support;
-
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.util.Set;
+package org.apache.mina.transport.socket.nio;
 
+import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandler;
 import org.apache.mina.common.IoService;
@@ -31,15 +26,19 @@
 import org.apache.mina.common.IoSessionConfig;
 import org.apache.mina.common.RuntimeIOException;
 import org.apache.mina.common.TransportType;
-import org.apache.mina.common.IoFilter.WriteRequest;
 import org.apache.mina.common.support.BaseIoSession;
 import org.apache.mina.common.support.BaseIoSessionConfig;
-import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.mina.util.Queue;
 
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Set;
+
 /**
  * An {@link IoSession} for socket transport (TCP/IP).
- * 
+ *
  * @author The Apache Directory Project (mina-dev@directory.apache.org)
  * @version $Rev$, $Date$
  */
@@ -55,22 +54,24 @@
     private final SocketAddress remoteAddress;
     private final SocketAddress localAddress;
     private final SocketAddress serviceAddress;
-    private final Set managedSessions;    
+    private final Set managedSessions;
     private SelectionKey key;
     private int readBufferSize;
 
     /**
      * Creates a new instance.
      */
-    public SocketSessionImpl(
-            IoService manager, Set managedSessions,
-            IoSessionConfig config,
-            SocketChannel ch, IoHandler defaultHandler,
-            SocketAddress serviceAddress )
+    SocketSessionImpl( IoService manager,
+                       SocketIoProcessor ioProcessor,
+                       Set managedSessions,
+                       IoSessionConfig config,
+                       SocketChannel ch,
+                       IoHandler defaultHandler,
+                       SocketAddress serviceAddress )
     {
         this.manager = manager;
         this.managedSessions = managedSessions;
-        this.ioProcessor = SocketIoProcessor.getInstance();
+        this.ioProcessor = ioProcessor;
         this.filterChain = new SocketFilterChain( this );
         this.ch = ch;
         this.writeRequestQueue = new Queue();
@@ -78,7 +79,7 @@
         this.remoteAddress = ch.socket().getRemoteSocketAddress();
         this.localAddress = ch.socket().getLocalSocketAddress();
         this.serviceAddress = serviceAddress;
-        
+
         // Apply the initial session settings
         if( config instanceof SocketSessionConfig )
         {
@@ -98,22 +99,22 @@
             }
         }
     }
-    
+
     public IoService getService()
     {
         return manager;
     }
-    
+
     public IoSessionConfig getConfig()
     {
         return config;
     }
-    
+
     SocketIoProcessor getIoProcessor()
     {
         return ioProcessor;
     }
-    
+
     public IoFilterChain getFilterChain()
     {
         return filterChain;
@@ -143,12 +144,12 @@
     {
         return handler;
     }
-    
+
     protected void close0()
     {
         filterChain.filterClose( this );
     }
-    
+
     Queue getWriteRequestQueue()
     {
         return writeRequestQueue;
@@ -169,7 +170,7 @@
             return writeRequestQueue.byteSize();
         }
     }
-    
+
     protected void write0( WriteRequest writeRequest )
     {
         filterChain.filterWrite( this, writeRequest );
@@ -189,17 +190,17 @@
     {
         return localAddress;
     }
-    
+
     public SocketAddress getServiceAddress()
     {
         return serviceAddress;
     }
-    
+
     protected void updateTrafficMask()
     {
         this.ioProcessor.updateTrafficMask( this );
     }
-    
+
     int getReadBufferSize()
     {
         return readBufferSize;
@@ -218,7 +219,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setKeepAlive( boolean on )
         {
             try
@@ -230,7 +231,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public boolean isOobInline()
         {
             try
@@ -242,7 +243,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setOobInline( boolean on )
         {
             try
@@ -254,7 +255,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public boolean isReuseAddress()
         {
             try
@@ -266,7 +267,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setReuseAddress( boolean on )
         {
             try
@@ -278,7 +279,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public int getSoLinger()
         {
             try
@@ -290,7 +291,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setSoLinger( int linger )
         {
             try
@@ -309,7 +310,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public boolean isTcpNoDelay()
         {
             try
@@ -321,7 +322,7 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setTcpNoDelay( boolean on )
         {
             try
@@ -333,41 +334,41 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public int getTrafficClass()
         {
-        	if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
-        	{
-	            try
-	            {
-	                return ch.socket().getTrafficClass();
-	            }
-	            catch( SocketException e )
-	            {
-	                throw new RuntimeIOException( e );
-	            }
-        	}
-        	else
-        	{
-        		return 0;
-        	}
+            if( SocketSessionConfigImpl.isGetTrafficClassAvailable() )
+            {
+                try
+                {
+                    return ch.socket().getTrafficClass();
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
+            else
+            {
+                return 0;
+            }
         }
-    
+
         public void setTrafficClass( int tc )
         {
-        	if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
-        	{
-	            try
-	            {
-	                ch.socket().setTrafficClass( tc );
-	            }
-	            catch( SocketException e )
-	            {
-	                throw new RuntimeIOException( e );
-	            }
-        	}
+            if( SocketSessionConfigImpl.isSetTrafficClassAvailable() )
+            {
+                try
+                {
+                    ch.socket().setTrafficClass( tc );
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
         }
-    
+
         public int getSendBufferSize()
         {
             try
@@ -379,22 +380,22 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setSendBufferSize( int size )
         {
-        	if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
-        	{
-	            try
-	            {
-	                ch.socket().setSendBufferSize( size );
-	            }
-	            catch( SocketException e )
-	            {
-	                throw new RuntimeIOException( e );
-	            }
-        	}
+            if( SocketSessionConfigImpl.isSetSendBufferSizeAvailable() )
+            {
+                try
+                {
+                    ch.socket().setSendBufferSize( size );
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
         }
-    
+
         public int getReceiveBufferSize()
         {
             try
@@ -406,21 +407,21 @@
                 throw new RuntimeIOException( e );
             }
         }
-    
+
         public void setReceiveBufferSize( int size )
         {
-        	if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
-        	{
-	            try
-	            {
-	                ch.socket().setReceiveBufferSize( size );
-	                SocketSessionImpl.this.readBufferSize = size;
-	            }
-	            catch( SocketException e )
-	            {
-	                throw new RuntimeIOException( e );
-	            }
-        	}
+            if( SocketSessionConfigImpl.isSetReceiveBufferSizeAvailable() )
+            {
+                try
+                {
+                    ch.socket().setReceiveBufferSize( size );
+                    SocketSessionImpl.this.readBufferSize = size;
+                }
+                catch( SocketException e )
+                {
+                    throw new RuntimeIOException( e );
+                }
+            }
         }
     }
 }

Propchange: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/SocketSessionImpl.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Modified: directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html
URL: http://svn.apache.org/viewcvs/directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html?rev=398039&r1=398038&r2=398039&view=diff
==============================================================================
--- directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html (original)
+++ directory/trunks/mina/core/src/main/java/org/apache/mina/transport/socket/nio/package.html Fri Apr 28 16:36:27 2006
@@ -3,16 +3,14 @@
 <head>
 </head>
 <body>
-Socket (TCP/IP) and Datagram (UDP/IP) support based on Java <a href="http://java.sun.com/j2se/1.5.0/docs/guide/nio/">NIO (New I/O) API</a>.
+Socket (TCP/IP) and Datagram (UDP/IP) support based on Java
+<a href="http://java.sun.com/j2se/1.5.0/docs/guide/nio/">NIO (New I/O) API</a>.
 
 <h3>Configuring the number of NIO selector loops</h3>
 <p>
 You can specify the number of Socket I/O thread to utilize multi-processors
-efficiently, using a Java system property, '<tt>mina.socket.processors':
-<pre>
-java -Dmina.socket.processors=2 com.example.mina.Main
-</pre>
-The default number of processors is '<tt>1</tt>'.
+efficiently by specifying the number of processing threads in the constructor. The default is <tt>1</tt>
+</p>
 
 </body>
 </html>