You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/04/22 04:50:32 UTC

svn commit: r164162 - in /directory/network/trunk/src/java/org/apache/mina: io/filter/IoThreadPoolFilter.java protocol/filter/ProtocolThreadPoolFilter.java util/AvailablePortFinder.java util/BaseThreadPool.java util/EventType.java util/ThreadPool.java

Author: trustin
Date: Thu Apr 21 19:50:31 2005
New Revision: 164162

URL: http://svn.apache.org/viewcvs?rev=164162&view=rev
Log:
* Refactored thread pool filters
** Added ThreadPool interface that thread pool filters implement
** Added BaseThreadPool (implements ThreadPool) that thread pool filters extend

Added:
    directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java   (with props)
    directory/network/trunk/src/java/org/apache/mina/util/EventType.java
    directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java   (with props)
Modified:
    directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
    directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
    directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java

Modified: directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Thu Apr 21 19:50:31 2005
@@ -18,67 +18,29 @@
  */
 package org.apache.mina.io.filter;
 
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.Session;
 import org.apache.mina.io.IoHandler;
 import org.apache.mina.io.IoHandlerFilter;
 import org.apache.mina.io.IoSession;
-import org.apache.mina.util.BlockingSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
+import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.EventType;
+import org.apache.mina.util.ThreadPool;
 
 /**
  * A Thread-pooling filter.  This filter forwards {@link IoHandler} events 
- * to its thread pool.  This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
+ * to its thread pool.
  * 
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
+ * 
+ * @see ThreadPool
+ * @see BaseThreadPool
  */
-public class IoThreadPoolFilter implements IoHandlerFilter
+public class IoThreadPoolFilter extends BaseThreadPool implements ThreadPool, IoHandlerFilter
 {
     /**
-     * Default maximum size of thread pool (2G).
-     */
-    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
-    /**
-     * Default keep-alive time of thread pool (1 min).
-     */
-    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
-    private static volatile int threadId = 0;
-
-    private Map buffers = new IdentityHashMap();
-
-    private Stack followers = new Stack();
-
-    private Worker leader;
-
-    private BlockingSet readySessionBuffers = new BlockingSet();
-
-    private Set busySessionBuffers = new HashSet();
-
-    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
-
-    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
-    private boolean started;
-
-    private boolean shuttingDown;
-
-    private int poolSize;
-
-    private final Object poolSizeLock = new Object();
-
-    /**
      * Creates a new instanceof this filter with default thread pool settings.
      * You'll have to invoke {@link #start()} method to start threads actually.
      */
@@ -86,120 +48,6 @@
     {
     }
 
-    /**
-     * Returns the number of threads in the thread pool.
-     */
-    public int getPoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            return poolSize;
-        }
-    }
-
-    /**
-     * Returns the maximum size of the thread pool.
-     */
-    public int getMaximumPoolSize()
-    {
-        return maximumPoolSize;
-    }
-
-    /**
-     * Returns the keep-alive time until the thread suicides after it became
-     * idle (milliseconds unit).
-     */
-    public int getKeepAliveTime()
-    {
-        return keepAliveTime;
-    }
-
-    /**
-     * Sets the maximum size of the thread pool.
-     */
-    public void setMaximumPoolSize( int maximumPoolSize )
-    {
-        if( maximumPoolSize <= 0 )
-            throw new IllegalArgumentException();
-        this.maximumPoolSize = maximumPoolSize;
-    }
-
-    /**
-     * Sets the keep-alive time until the thread suicides after it became idle
-     * (milliseconds unit).
-     */
-    public void setKeepAliveTime( int keepAliveTime )
-    {
-        this.keepAliveTime = keepAliveTime;
-    }
-
-    /**
-     * Starts thread pool threads and starts forwarding events to them.
-     */
-    public synchronized void start()
-    {
-        if( started )
-            return;
-
-        shuttingDown = false;
-
-        leader = new Worker();
-        leader.start();
-        leader.lead();
-
-        started = true;
-    }
-
-    /**
-     * Stops all thread pool threads.
-     */
-    public synchronized void stop()
-    {
-        if( !started )
-            return;
-
-        shuttingDown = true;
-        Worker lastLeader = null;
-        for( ;; )
-        {
-            Worker leader = this.leader;
-            if( lastLeader == leader )
-                break;
-
-            while( leader.isAlive() )
-            {
-                leader.interrupt();
-                try
-                {
-                    leader.join();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-
-            lastLeader = leader;
-        }
-
-        started = false;
-    }
-
-    private void increasePoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize++;
-        }
-    }
-
-    private void decreasePoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize--;
-        }
-    }
-
     public void sessionOpened( NextFilter nextFilter, IoSession session )
     {
         fireEvent( nextFilter, session, EventType.OPENED, null );
@@ -223,7 +71,7 @@
     }
 
     public void dataRead( NextFilter nextFilter, IoSession session,
-                         ByteBuffer buf )
+                          ByteBuffer buf )
     {
         // MINA will release the buffer if this method returns.
         buf.acquire();
@@ -236,318 +84,36 @@
         fireEvent( nextFilter, session, EventType.WRITTEN, marker );
     }
 
-    private void fireEvent( NextFilter nextFilter, IoSession session,
-                           EventType type, Object data )
-    {
-        SessionBuffer buf = getSessionBuffer( session );
-        synchronized( buf )
-        {
-            buf.nextFilters.push( nextFilter );
-            buf.eventTypes.push( type );
-            buf.eventDatum.push( data );
-        }
-
-        synchronized( readySessionBuffers )
-        {
-            if( !busySessionBuffers.contains( buf ) )
-            {
-                busySessionBuffers.add( buf );
-                readySessionBuffers.add( buf );
-            }
-        }
-    }
-
-    private SessionBuffer getSessionBuffer( IoSession session )
-    {
-        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
-        if( buf == null )
-        {
-            synchronized( buffers )
-            {
-                buf = ( SessionBuffer ) buffers.get( session );
-                if( buf == null )
-                {
-                    buf = new SessionBuffer( session );
-                    buffers.put( session, buf );
-                }
-            }
-        }
-        return buf;
-    }
-
-    private void removeSessionBuffer( SessionBuffer buf )
-    {
-        synchronized( buffers )
-        {
-            buffers.remove( buf.session );
-        }
-    }
-
-    private static class SessionBuffer
-    {
-
-        private final IoSession session;
-
-        private final Queue nextFilters = new Queue();
-
-        private final Queue eventTypes = new Queue();
-
-        private final Queue eventDatum = new Queue();
-
-        private SessionBuffer( IoSession session )
-        {
-            this.session = session;
-        }
-    }
-
-    private static class EventType
-    {
-        private static final EventType OPENED = new EventType();
-
-        private static final EventType CLOSED = new EventType();
-
-        private static final EventType READ = new EventType();
-
-        private static final EventType WRITTEN = new EventType();
-
-        private static final EventType IDLE = new EventType();
-
-        private static final EventType EXCEPTION = new EventType();
-
-        private EventType()
-        {
-        }
-    }
-
-    private class Worker extends Thread
+    protected void processEvent( Object nextFilter0, Session session0,
+                                 EventType type, Object data )
     {
-        private final Object promotionLock = new Object();
-
-        private Worker()
-        {
-            super( "IoThreadPool-" + ( threadId++ ) );
-            increasePoolSize();
-        }
-
-        public void lead()
-        {
-            synchronized( promotionLock )
-            {
-                leader = this;
-                promotionLock.notify();
-            }
-        }
-
-        public void run()
-        {
-            for( ;; )
-            {
-                if( !waitForPromotion() )
-                    break;
-
-                SessionBuffer buf = fetchBuffer();
-                giveUpLead();
-
-                if( buf == null )
-                {
-                    break;
-                }
-
-                processEvents( buf );
-                follow();
-                releaseBuffer( buf );
-            }
-
-            decreasePoolSize();
-        }
-
-        private SessionBuffer fetchBuffer()
+        NextFilter nextFilter = ( NextFilter ) nextFilter0;
+        IoSession session = ( IoSession ) session0;
+        if( type == EventType.READ )
         {
-            SessionBuffer buf = null;
-            synchronized( readySessionBuffers )
-            {
-                do
-                {
-                    buf = null;
-                    try
-                    {
-                        readySessionBuffers.waitForNewItem();
-                    }
-                    catch( InterruptedException e )
-                    {
-                        break;
-                    }
-
-                    Iterator it = readySessionBuffers.iterator();
-                    if( !it.hasNext() )
-                    {
-                        // exceeded keepAliveTime
-                        break;
-                    }
-
-                    do
-                    {
-                        buf = null;
-                        buf = ( SessionBuffer ) it.next();
-                        it.remove();
-                    }
-                    while( buf != null && buf.nextFilters.isEmpty()
-                           && it.hasNext() );
-                }
-                while( buf != null && buf.nextFilters.isEmpty() );
-            }
-
-            return buf;
-        }
-
-        private void processEvents( SessionBuffer buf )
-        {
-            IoSession session = buf.session;
-            for( ;; )
-            {
-                NextFilter nextFilter;
-                EventType type;
-                Object data;
-                synchronized( buf )
-                {
-                    nextFilter = ( NextFilter ) buf.nextFilters.pop();
-                    if( nextFilter == null )
-                        break;
-
-                    type = ( EventType ) buf.eventTypes.pop();
-                    data = buf.eventDatum.pop();
-                }
-                processEvent( nextFilter, session, type, data );
-            }
+            ByteBuffer buf = ( ByteBuffer ) data;
+            nextFilter.dataRead( session, buf );
+            buf.release();
         }
-
-        private void processEvent( NextFilter nextFilter, IoSession session,
-                                  EventType type, Object data )
+        else if( type == EventType.WRITTEN )
         {
-            if( type == EventType.READ )
-            {
-                ByteBuffer buf = ( ByteBuffer ) data;
-                nextFilter.dataRead( session, buf );
-                buf.release();
-            }
-            else if( type == EventType.WRITTEN )
-            {
-                nextFilter.dataWritten( session, data );
-            }
-            else if( type == EventType.EXCEPTION )
-            {
-                nextFilter.exceptionCaught( session, ( Throwable ) data );
-            }
-            else if( type == EventType.IDLE )
-            {
-                nextFilter.sessionIdle( session, ( IdleStatus ) data );
-            }
-            else if( type == EventType.OPENED )
-            {
-                nextFilter.sessionOpened( session );
-            }
-            else if( type == EventType.CLOSED )
-            {
-                nextFilter.sessionClosed( session );
-            }
+            nextFilter.dataWritten( session, data );
         }
-
-        private void follow()
+        else if( type == EventType.EXCEPTION )
         {
-            synchronized( promotionLock )
-            {
-                if( this != leader )
-                {
-                    synchronized( followers )
-                    {
-                        followers.push( this );
-                    }
-                }
-            }
+            nextFilter.exceptionCaught( session, ( Throwable ) data );
         }
-
-        private void releaseBuffer( SessionBuffer buf )
+        else if( type == EventType.IDLE )
         {
-            synchronized( readySessionBuffers )
-            {
-                busySessionBuffers.remove( buf );
-                if( buf.nextFilters.isEmpty() )
-                {
-                    removeSessionBuffer( buf );
-                }
-                else
-                {
-                    readySessionBuffers.add( buf );
-                }
-            }
+            nextFilter.sessionIdle( session, ( IdleStatus ) data );
         }
-
-        private boolean waitForPromotion()
+        else if( type == EventType.OPENED )
         {
-            synchronized( promotionLock )
-            {
-                if( this != leader )
-                {
-                    try
-                    {
-                        int keepAliveTime = getKeepAliveTime();
-                        if( keepAliveTime > 0 )
-                        {
-                            promotionLock.wait( keepAliveTime );
-                        }
-                        else
-                        {
-                            promotionLock.wait();
-                        }
-                    }
-                    catch( InterruptedException e )
-                    {
-                    }
-                }
-
-                boolean timeToLead = this == leader;
-
-                if( !timeToLead )
-                {
-                    // time to die
-                    synchronized( followers )
-                    {
-                        followers.remove( this );
-                    }
-                }
-
-                return timeToLead;
-            }
+            nextFilter.sessionOpened( session );
         }
-
-        private void giveUpLead()
+        else if( type == EventType.CLOSED )
         {
-            Worker worker;
-            synchronized( followers )
-            {
-                worker = ( Worker ) followers.pop();
-            }
-
-            if( worker != null )
-            {
-                worker.lead();
-            }
-            else
-            {
-                if( !shuttingDown )
-                {
-                    synchronized( IoThreadPoolFilter.this )
-                    {
-                        if( !shuttingDown
-                            && getPoolSize() < getMaximumPoolSize() )
-                        {
-                            worker = new Worker();
-                            worker.start();
-                            worker.lead();
-                        }
-                    }
-                }
-            }
+            nextFilter.sessionClosed( session );
         }
     }
 

Modified: directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Thu Apr 21 19:50:31 2005
@@ -18,157 +18,36 @@
  */
 package org.apache.mina.protocol.filter;
 
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.Session;
 import org.apache.mina.protocol.ProtocolHandler;
 import org.apache.mina.protocol.ProtocolHandlerFilter;
 import org.apache.mina.protocol.ProtocolSession;
-import org.apache.mina.util.BlockingSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
+import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.EventType;
+import org.apache.mina.util.ThreadPool;
 
 /**
  * A Thread-pooling filter.  This filter forwards {@link ProtocolHandler} events
- * to its thread pool.  This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
+ * to its thread pool.
  * 
  * @author Trustin Lee (trustin@apache.org)
  * @version $Rev$, $Date$
  * 
- * @author Trustin Lee (trustin@apache.org)
- * @version $Rev$, $Date$
+ * @see ThreadPool
+ * @see BaseThreadPool
  */
-public class ProtocolThreadPoolFilter implements ProtocolHandlerFilter
+public class ProtocolThreadPoolFilter extends BaseThreadPool implements ThreadPool, ProtocolHandlerFilter
 {
-    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
-    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
-    private static volatile int threadId = 0;
-
-    private Map buffers = new IdentityHashMap();
-
-    private Stack followers = new Stack();
-
-    private Worker leader;
-
-    private BlockingSet readySessionBuffers = new BlockingSet();
-
-    private Set busySessionBuffers = new HashSet();
-
-    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
-
-    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
-    private boolean started;
-
-    private boolean shuttingDown;
-
-    private int poolSize;
-
-    private final Object poolSizeLock = new Object();
 
+    /**
+     * Creates a new instanceof this filter with default thread pool settings.
+     * You'll have to invoke {@link #start()} method to start threads actually.
+     */
     public ProtocolThreadPoolFilter()
     {
     }
 
-    public int getPoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            return poolSize;
-        }
-    }
-
-    public int getMaximumPoolSize()
-    {
-        return maximumPoolSize;
-    }
-
-    public int getKeepAliveTime()
-    {
-        return keepAliveTime;
-    }
-
-    public void setMaximumPoolSize( int maximumPoolSize )
-    {
-        if( maximumPoolSize <= 0 )
-            throw new IllegalArgumentException();
-        this.maximumPoolSize = maximumPoolSize;
-    }
-
-    public void setKeepAliveTime( int keepAliveTime )
-    {
-        this.keepAliveTime = keepAliveTime;
-    }
-
-    public synchronized void start()
-    {
-        if( started )
-            return;
-
-        shuttingDown = false;
-
-        leader = new Worker();
-        leader.start();
-        leader.lead();
-
-        started = true;
-    }
-
-    public synchronized void stop()
-    {
-        if( !started )
-            return;
-
-        shuttingDown = true;
-        Worker lastLeader = null;
-        for( ;; )
-        {
-            Worker leader = this.leader;
-            if( lastLeader == leader )
-                break;
-
-            while( leader.isAlive() )
-            {
-                leader.interrupt();
-                try
-                {
-                    leader.join();
-                }
-                catch( InterruptedException e )
-                {
-                }
-            }
-
-            lastLeader = leader;
-        }
-
-        started = false;
-    }
-
-    private void increasePoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize++;
-        }
-    }
-
-    private void decreasePoolSize()
-    {
-        synchronized( poolSizeLock )
-        {
-            poolSize--;
-        }
-    }
-
     public void sessionOpened( NextFilter nextFilter,
                               ProtocolSession session )
     {
@@ -205,315 +84,36 @@
         fireEvent( nextFilter, session, EventType.SENT, message );
     }
 
-    private void fireEvent( NextFilter nextFilter,
-                           ProtocolSession session, EventType type, Object data )
-    {
-        SessionBuffer buf = getSessionBuffer( session );
-        synchronized( buf )
-        {
-            buf.nextFilters.push( nextFilter );
-            buf.eventTypes.push( type );
-            buf.eventDatum.push( data );
-        }
-
-        synchronized( readySessionBuffers )
-        {
-            if( !busySessionBuffers.contains( buf ) )
-            {
-                busySessionBuffers.add( buf );
-                readySessionBuffers.add( buf );
-            }
-        }
-    }
-
-    private SessionBuffer getSessionBuffer( ProtocolSession session )
+    protected void processEvent( Object nextFilter0,
+                               Session session0, EventType type,
+                               Object data )
     {
-        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
-        if( buf == null )
-        {
-            synchronized( buffers )
-            {
-                buf = ( SessionBuffer ) buffers.get( session );
-                if( buf == null )
-                {
-                    buf = new SessionBuffer( session );
-                    buffers.put( session, buf );
-                }
-            }
-        }
-        return buf;
-    }
+        NextFilter nextFilter = ( NextFilter ) nextFilter0;
+        ProtocolSession session = ( ProtocolSession ) session0;
 
-    private void removeSessionBuffer( SessionBuffer buf )
-    {
-        synchronized( buffers )
+        if( type == EventType.RECEIVED )
         {
-            buffers.remove( buf.session );
+            nextFilter.messageReceived( session, data );
         }
-    }
-
-    private static class SessionBuffer
-    {
-
-        private final ProtocolSession session;
-
-        private final Queue nextFilters = new Queue();
-
-        private final Queue eventTypes = new Queue();
-
-        private final Queue eventDatum = new Queue();
-
-        private SessionBuffer( ProtocolSession session )
+        else if( type == EventType.SENT )
         {
-            this.session = session;
+            nextFilter.messageSent( session, data );
         }
-    }
-
-    private static class EventType
-    {
-        private static final EventType OPENED = new EventType();
-
-        private static final EventType CLOSED = new EventType();
-
-        private static final EventType RECEIVED = new EventType();
-
-        private static final EventType SENT = new EventType();
-
-        private static final EventType IDLE = new EventType();
-
-        private static final EventType EXCEPTION = new EventType();
-
-        private EventType()
+        else if( type == EventType.EXCEPTION )
         {
+            nextFilter.exceptionCaught( session, ( Throwable ) data );
         }
-    }
-
-    private class Worker extends Thread
-    {
-        private final Object promotionLock = new Object();
-
-        private Worker()
+        else if( type == EventType.IDLE )
         {
-            super( "ProtocolThreadPool-" + ( threadId++ ) );
-            increasePoolSize();
+            nextFilter.sessionIdle( session, ( IdleStatus ) data );
         }
-
-        public void lead()
+        else if( type == EventType.OPENED )
         {
-            synchronized( promotionLock )
-            {
-                leader = this;
-                promotionLock.notify();
-            }
+            nextFilter.sessionOpened( session );
         }
-
-        public void run()
+        else if( type == EventType.CLOSED )
         {
-            for( ;; )
-            {
-                if( !waitForPromotion() )
-                    break;
-
-                SessionBuffer buf = fetchBuffer();
-                giveUpLead();
-
-                if( buf == null )
-                    break;
-
-                processEvents( buf );
-                follow();
-                releaseBuffer( buf );
-            }
-
-            decreasePoolSize();
-        }
-
-        private SessionBuffer fetchBuffer()
-        {
-            SessionBuffer buf = null;
-            synchronized( readySessionBuffers )
-            {
-                do
-                {
-                    buf = null;
-                    try
-                    {
-                        readySessionBuffers.waitForNewItem();
-                    }
-                    catch( InterruptedException e )
-                    {
-                        break;
-                    }
-
-                    Iterator it = readySessionBuffers.iterator();
-                    if( !it.hasNext() )
-                    {
-                        // exceeded keepAliveTime
-                        break;
-                    }
-
-                    do
-                    {
-                        buf = null;
-                        buf = ( SessionBuffer ) it.next();
-                        it.remove();
-                    }
-                    while( buf != null && buf.nextFilters.isEmpty()
-                           && it.hasNext() );
-                }
-                while( buf != null && buf.nextFilters.isEmpty() );
-            }
-
-            return buf;
-        }
-
-        private void processEvents( SessionBuffer buf )
-        {
-            ProtocolSession session = buf.session;
-            for( ;; )
-            {
-                NextFilter nextFilter;
-                EventType type;
-                Object data;
-                synchronized( buf )
-                {
-                    nextFilter = ( NextFilter ) buf.nextFilters.pop();
-                    if( nextFilter == null )
-                        break;
-
-                    type = ( EventType ) buf.eventTypes.pop();
-                    data = buf.eventDatum.pop();
-                }
-                processEvent( nextFilter, session, type, data );
-            }
-        }
-
-        private void processEvent( NextFilter nextFilter,
-                                  ProtocolSession session, EventType type,
-                                  Object data )
-        {
-            if( type == EventType.RECEIVED )
-            {
-                nextFilter.messageReceived( session, data );
-            }
-            else if( type == EventType.SENT )
-            {
-                nextFilter.messageSent( session, data );
-            }
-            else if( type == EventType.EXCEPTION )
-            {
-                nextFilter.exceptionCaught( session, ( Throwable ) data );
-            }
-            else if( type == EventType.IDLE )
-            {
-                nextFilter.sessionIdle( session, ( IdleStatus ) data );
-            }
-            else if( type == EventType.OPENED )
-            {
-                nextFilter.sessionOpened( session );
-            }
-            else if( type == EventType.CLOSED )
-            {
-                nextFilter.sessionClosed( session );
-            }
-        }
-
-        private void follow()
-        {
-            synchronized( promotionLock )
-            {
-                if( this != leader )
-                {
-                    synchronized( followers )
-                    {
-                        followers.push( this );
-                    }
-                }
-            }
-        }
-
-        private void releaseBuffer( SessionBuffer buf )
-        {
-            synchronized( readySessionBuffers )
-            {
-                busySessionBuffers.remove( buf );
-                if( buf.nextFilters.isEmpty() )
-                {
-                    removeSessionBuffer( buf );
-                }
-                else
-                {
-                    readySessionBuffers.add( buf );
-                }
-            }
-        }
-
-        private boolean waitForPromotion()
-        {
-            synchronized( promotionLock )
-            {
-                if( this != leader )
-                {
-                    try
-                    {
-                        int keepAliveTime = getKeepAliveTime();
-                        if( keepAliveTime > 0 )
-                        {
-                            promotionLock.wait( keepAliveTime );
-                        }
-                        else
-                        {
-                            promotionLock.wait();
-                        }
-                    }
-                    catch( InterruptedException e )
-                    {
-                    }
-                }
-
-                boolean timeToLead = this == leader;
-
-                if( !timeToLead )
-                {
-                    // time to die
-                    synchronized( followers )
-                    {
-                        followers.remove( this );
-                    }
-                }
-
-                return timeToLead;
-            }
-        }
-
-        private void giveUpLead()
-        {
-            Worker worker;
-            synchronized( followers )
-            {
-                worker = ( Worker ) followers.pop();
-            }
-
-            if( worker != null )
-            {
-                worker.lead();
-            }
-            else
-            {
-                if( !shuttingDown )
-                {
-                    synchronized( ProtocolThreadPoolFilter.this )
-                    {
-                        if( !shuttingDown
-                            && getPoolSize() < getMaximumPoolSize() )
-                        {
-                            worker = new Worker();
-                            worker.start();
-                            worker.lead();
-                        }
-                    }
-                }
-            }
+            nextFilter.sessionClosed( session );
         }
     }
 

Modified: directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java Thu Apr 21 19:50:31 2005
@@ -16,7 +16,6 @@
  *   limitations under the License.
  *
  */
-
 package org.apache.mina.util;
 
 import java.io.IOException;

Added: directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,442 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.Session;
+
+/**
+ * A base implementation of Thread-pooling filters.
+ * This filter forwards events to its thread pool.  This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class BaseThreadPool implements ThreadPool
+{
+    /**
+     * Default maximum size of thread pool (2G).
+     */
+    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+    /**
+     * Default keep-alive time of thread pool (1 min).
+     */
+    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+    private static volatile int threadId = 0;
+
+    private Map buffers = new IdentityHashMap();
+
+    private Stack followers = new Stack();
+
+    private Worker leader;
+
+    private BlockingSet readySessionBuffers = new BlockingSet();
+
+    private Set busySessionBuffers = new HashSet();
+
+    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+
+    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+    private boolean started;
+
+    private boolean shuttingDown;
+
+    private int poolSize;
+
+    private final Object poolSizeLock = new Object();
+
+    /**
+     * Creates a new instance with default thread pool settings.
+     * You'll have to invoke {@link #start()} method to start threads actually.
+     */
+    protected BaseThreadPool()
+    {
+    }
+
+    public int getPoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            return poolSize;
+        }
+    }
+
+    public int getMaximumPoolSize()
+    {
+        return maximumPoolSize;
+    }
+
+    public int getKeepAliveTime()
+    {
+        return keepAliveTime;
+    }
+
+    public void setMaximumPoolSize( int maximumPoolSize )
+    {
+        if( maximumPoolSize <= 0 )
+            throw new IllegalArgumentException();
+        this.maximumPoolSize = maximumPoolSize;
+    }
+
+    public void setKeepAliveTime( int keepAliveTime )
+    {
+        this.keepAliveTime = keepAliveTime;
+    }
+
+    public synchronized void start()
+    {
+        if( started )
+            return;
+
+        shuttingDown = false;
+
+        leader = new Worker();
+        leader.start();
+        leader.lead();
+
+        started = true;
+    }
+
+    public synchronized void stop()
+    {
+        if( !started )
+            return;
+
+        shuttingDown = true;
+        Worker lastLeader = null;
+        for( ;; )
+        {
+            Worker leader = this.leader;
+            if( lastLeader == leader )
+                break;
+
+            while( leader.isAlive() )
+            {
+                leader.interrupt();
+                try
+                {
+                    leader.join();
+                }
+                catch( InterruptedException e )
+                {
+                }
+            }
+
+            lastLeader = leader;
+        }
+
+        started = false;
+    }
+
+    private void increasePoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize++;
+        }
+    }
+
+    private void decreasePoolSize()
+    {
+        synchronized( poolSizeLock )
+        {
+            poolSize--;
+        }
+    }
+
+    protected void fireEvent( Object nextFilter, Session session,
+                              EventType type, Object data )
+    {
+        SessionBuffer buf = getSessionBuffer( session );
+        synchronized( buf )
+        {
+            buf.nextFilters.push( nextFilter );
+            buf.eventTypes.push( type );
+            buf.eventDatum.push( data );
+        }
+
+        synchronized( readySessionBuffers )
+        {
+            if( !busySessionBuffers.contains( buf ) )
+            {
+                busySessionBuffers.add( buf );
+                readySessionBuffers.add( buf );
+            }
+        }
+    }
+
+    /**
+     * Implement this method to forward events to <tt>nextFilter</tt>.
+     */
+    protected abstract void processEvent( Object nextFilter, Session session,
+                                          EventType type, Object data );
+
+    private SessionBuffer getSessionBuffer( Session session )
+    {
+        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+        if( buf == null )
+        {
+            synchronized( buffers )
+            {
+                buf = ( SessionBuffer ) buffers.get( session );
+                if( buf == null )
+                {
+                    buf = new SessionBuffer( session );
+                    buffers.put( session, buf );
+                }
+            }
+        }
+        return buf;
+    }
+
+    private void removeSessionBuffer( SessionBuffer buf )
+    {
+        synchronized( buffers )
+        {
+            buffers.remove( buf.session );
+        }
+    }
+
+    private static class SessionBuffer
+    {
+        private final Session session;
+
+        private final Queue nextFilters = new Queue();
+
+        private final Queue eventTypes = new Queue();
+
+        private final Queue eventDatum = new Queue();
+
+        private SessionBuffer( Session session )
+        {
+            this.session = session;
+        }
+    }
+
+    private class Worker extends Thread
+    {
+        private final Object promotionLock = new Object();
+
+        private Worker()
+        {
+            super( "IoThreadPool-" + ( threadId++ ) );
+            increasePoolSize();
+        }
+
+        public void lead()
+        {
+            synchronized( promotionLock )
+            {
+                leader = this;
+                promotionLock.notify();
+            }
+        }
+
+        public void run()
+        {
+            for( ;; )
+            {
+                if( !waitForPromotion() )
+                    break;
+
+                SessionBuffer buf = fetchBuffer();
+                giveUpLead();
+
+                if( buf == null )
+                {
+                    break;
+                }
+
+                processEvents( buf );
+                follow();
+                releaseBuffer( buf );
+            }
+
+            decreasePoolSize();
+        }
+
+        private SessionBuffer fetchBuffer()
+        {
+            SessionBuffer buf = null;
+            synchronized( readySessionBuffers )
+            {
+                do
+                {
+                    buf = null;
+                    try
+                    {
+                        readySessionBuffers.waitForNewItem();
+                    }
+                    catch( InterruptedException e )
+                    {
+                        break;
+                    }
+
+                    Iterator it = readySessionBuffers.iterator();
+                    if( !it.hasNext() )
+                    {
+                        // exceeded keepAliveTime
+                        break;
+                    }
+
+                    do
+                    {
+                        buf = null;
+                        buf = ( SessionBuffer ) it.next();
+                        it.remove();
+                    }
+                    while( buf != null && buf.nextFilters.isEmpty()
+                           && it.hasNext() );
+                }
+                while( buf != null && buf.nextFilters.isEmpty() );
+            }
+
+            return buf;
+        }
+
+        private void processEvents( SessionBuffer buf )
+        {
+            Session session = buf.session;
+            for( ;; )
+            {
+                Object nextFilter;
+                EventType type;
+                Object data;
+                synchronized( buf )
+                {
+                    nextFilter = buf.nextFilters.pop();
+                    if( nextFilter == null )
+                        break;
+
+                    type = ( EventType ) buf.eventTypes.pop();
+                    data = buf.eventDatum.pop();
+                }
+                processEvent( nextFilter, session, type, data );
+            }
+        }
+
+        private void follow()
+        {
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    synchronized( followers )
+                    {
+                        followers.push( this );
+                    }
+                }
+            }
+        }
+
+        private void releaseBuffer( SessionBuffer buf )
+        {
+            synchronized( readySessionBuffers )
+            {
+                busySessionBuffers.remove( buf );
+                if( buf.nextFilters.isEmpty() )
+                {
+                    removeSessionBuffer( buf );
+                }
+                else
+                {
+                    readySessionBuffers.add( buf );
+                }
+            }
+        }
+
+        private boolean waitForPromotion()
+        {
+            synchronized( promotionLock )
+            {
+                if( this != leader )
+                {
+                    try
+                    {
+                        int keepAliveTime = getKeepAliveTime();
+                        if( keepAliveTime > 0 )
+                        {
+                            promotionLock.wait( keepAliveTime );
+                        }
+                        else
+                        {
+                            promotionLock.wait();
+                        }
+                    }
+                    catch( InterruptedException e )
+                    {
+                    }
+                }
+
+                boolean timeToLead = this == leader;
+
+                if( !timeToLead )
+                {
+                    // time to die
+                    synchronized( followers )
+                    {
+                        followers.remove( this );
+                    }
+                }
+
+                return timeToLead;
+            }
+        }
+
+        private void giveUpLead()
+        {
+            Worker worker;
+            synchronized( followers )
+            {
+                worker = ( Worker ) followers.pop();
+            }
+
+            if( worker != null )
+            {
+                worker.lead();
+            }
+            else
+            {
+                if( !shuttingDown )
+                {
+                    synchronized( BaseThreadPool.this )
+                    {
+                        if( !shuttingDown
+                            && getPoolSize() < getMaximumPoolSize() )
+                        {
+                            worker = new Worker();
+                            worker.start();
+                            worker.lead();
+                        }
+                    }
+                }
+            }
+        }
+    }
+}
\ No newline at end of file

Propchange: directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision

Added: directory/network/trunk/src/java/org/apache/mina/util/EventType.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/EventType.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/EventType.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/EventType.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,50 @@
+/*
+ *   @(#) $Id: AvailablePortFinder.java 155923 2005-03-02 14:23:42Z trustin $
+ * 
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * Enumeration for MINA event types.
+ * Used by {@link ThreadPool}s when they push events to event queue.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class EventType
+{
+    public static final EventType OPENED = new EventType();
+
+    public static final EventType CLOSED = new EventType();
+
+    public static final EventType READ = new EventType();
+
+    public static final EventType WRITTEN = new EventType();
+
+    public static final EventType RECEIVED = new EventType();
+
+    public static final EventType SENT = new EventType();
+
+    public static final EventType IDLE = new EventType();
+
+    public static final EventType EXCEPTION = new EventType();
+
+    private EventType()
+    {
+    }
+}
\ No newline at end of file

Added: directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,66 @@
+/*
+ *   @(#) $Id$
+ *
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A generic thread pool interface.
+ * 
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ThreadPool {
+    
+    /**
+     * Returns the number of threads in the thread pool.
+     */
+    int getPoolSize();
+
+    /**
+     * Returns the maximum size of the thread pool.
+     */
+    int getMaximumPoolSize();
+
+    /**
+     * Returns the keep-alive time until the thread suicides after it became
+     * idle (milliseconds unit).
+     */
+    int getKeepAliveTime();
+
+    /**
+     * Sets the maximum size of the thread pool.
+     */
+    void setMaximumPoolSize( int maximumPoolSize );
+
+    /**
+     * Sets the keep-alive time until the thread suicides after it became idle
+     * (milliseconds unit).
+     */
+    void setKeepAliveTime( int keepAliveTime );
+
+    /**
+     * Starts thread pool threads and starts forwarding events to them.
+     */
+    void start();
+
+    /**
+     * Stops all thread pool threads.
+     */
+    void stop();
+}

Propchange: directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
------------------------------------------------------------------------------
    svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision