You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2005/04/22 04:50:32 UTC
svn commit: r164162 - in /directory/network/trunk/src/java/org/apache/mina:
io/filter/IoThreadPoolFilter.java
protocol/filter/ProtocolThreadPoolFilter.java util/AvailablePortFinder.java
util/BaseThreadPool.java util/EventType.java util/ThreadPool.java
Author: trustin
Date: Thu Apr 21 19:50:31 2005
New Revision: 164162
URL: http://svn.apache.org/viewcvs?rev=164162&view=rev
Log:
* Refactored thread pool filters
** Added ThreadPool interface that thread pool filters implement
** Added BaseThreadPool (implements ThreadPool) that thread pool filters extend
Added:
directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java (with props)
directory/network/trunk/src/java/org/apache/mina/util/EventType.java
directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java (with props)
Modified:
directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java
Modified: directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/io/filter/IoThreadPoolFilter.java Thu Apr 21 19:50:31 2005
@@ -18,67 +18,29 @@
*/
package org.apache.mina.io.filter;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.Session;
import org.apache.mina.io.IoHandler;
import org.apache.mina.io.IoHandlerFilter;
import org.apache.mina.io.IoSession;
-import org.apache.mina.util.BlockingSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
+import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.EventType;
+import org.apache.mina.util.ThreadPool;
/**
* A Thread-pooling filter. This filter forwards {@link IoHandler} events
- * to its thread pool. This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
+ * to its thread pool.
*
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$
+ *
+ * @see ThreadPool
+ * @see BaseThreadPool
*/
-public class IoThreadPoolFilter implements IoHandlerFilter
+public class IoThreadPoolFilter extends BaseThreadPool implements ThreadPool, IoHandlerFilter
{
/**
- * Default maximum size of thread pool (2G).
- */
- public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
- /**
- * Default keep-alive time of thread pool (1 min).
- */
- public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
- private static volatile int threadId = 0;
-
- private Map buffers = new IdentityHashMap();
-
- private Stack followers = new Stack();
-
- private Worker leader;
-
- private BlockingSet readySessionBuffers = new BlockingSet();
-
- private Set busySessionBuffers = new HashSet();
-
- private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
-
- private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
- private boolean started;
-
- private boolean shuttingDown;
-
- private int poolSize;
-
- private final Object poolSizeLock = new Object();
-
- /**
* Creates a new instanceof this filter with default thread pool settings.
* You'll have to invoke {@link #start()} method to start threads actually.
*/
@@ -86,120 +48,6 @@
{
}
- /**
- * Returns the number of threads in the thread pool.
- */
- public int getPoolSize()
- {
- synchronized( poolSizeLock )
- {
- return poolSize;
- }
- }
-
- /**
- * Returns the maximum size of the thread pool.
- */
- public int getMaximumPoolSize()
- {
- return maximumPoolSize;
- }
-
- /**
- * Returns the keep-alive time until the thread suicides after it became
- * idle (milliseconds unit).
- */
- public int getKeepAliveTime()
- {
- return keepAliveTime;
- }
-
- /**
- * Sets the maximum size of the thread pool.
- */
- public void setMaximumPoolSize( int maximumPoolSize )
- {
- if( maximumPoolSize <= 0 )
- throw new IllegalArgumentException();
- this.maximumPoolSize = maximumPoolSize;
- }
-
- /**
- * Sets the keep-alive time until the thread suicides after it became idle
- * (milliseconds unit).
- */
- public void setKeepAliveTime( int keepAliveTime )
- {
- this.keepAliveTime = keepAliveTime;
- }
-
- /**
- * Starts thread pool threads and starts forwarding events to them.
- */
- public synchronized void start()
- {
- if( started )
- return;
-
- shuttingDown = false;
-
- leader = new Worker();
- leader.start();
- leader.lead();
-
- started = true;
- }
-
- /**
- * Stops all thread pool threads.
- */
- public synchronized void stop()
- {
- if( !started )
- return;
-
- shuttingDown = true;
- Worker lastLeader = null;
- for( ;; )
- {
- Worker leader = this.leader;
- if( lastLeader == leader )
- break;
-
- while( leader.isAlive() )
- {
- leader.interrupt();
- try
- {
- leader.join();
- }
- catch( InterruptedException e )
- {
- }
- }
-
- lastLeader = leader;
- }
-
- started = false;
- }
-
- private void increasePoolSize()
- {
- synchronized( poolSizeLock )
- {
- poolSize++;
- }
- }
-
- private void decreasePoolSize()
- {
- synchronized( poolSizeLock )
- {
- poolSize--;
- }
- }
-
public void sessionOpened( NextFilter nextFilter, IoSession session )
{
fireEvent( nextFilter, session, EventType.OPENED, null );
@@ -223,7 +71,7 @@
}
public void dataRead( NextFilter nextFilter, IoSession session,
- ByteBuffer buf )
+ ByteBuffer buf )
{
// MINA will release the buffer if this method returns.
buf.acquire();
@@ -236,318 +84,36 @@
fireEvent( nextFilter, session, EventType.WRITTEN, marker );
}
- private void fireEvent( NextFilter nextFilter, IoSession session,
- EventType type, Object data )
- {
- SessionBuffer buf = getSessionBuffer( session );
- synchronized( buf )
- {
- buf.nextFilters.push( nextFilter );
- buf.eventTypes.push( type );
- buf.eventDatum.push( data );
- }
-
- synchronized( readySessionBuffers )
- {
- if( !busySessionBuffers.contains( buf ) )
- {
- busySessionBuffers.add( buf );
- readySessionBuffers.add( buf );
- }
- }
- }
-
- private SessionBuffer getSessionBuffer( IoSession session )
- {
- SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- synchronized( buffers )
- {
- buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- buf = new SessionBuffer( session );
- buffers.put( session, buf );
- }
- }
- }
- return buf;
- }
-
- private void removeSessionBuffer( SessionBuffer buf )
- {
- synchronized( buffers )
- {
- buffers.remove( buf.session );
- }
- }
-
- private static class SessionBuffer
- {
-
- private final IoSession session;
-
- private final Queue nextFilters = new Queue();
-
- private final Queue eventTypes = new Queue();
-
- private final Queue eventDatum = new Queue();
-
- private SessionBuffer( IoSession session )
- {
- this.session = session;
- }
- }
-
- private static class EventType
- {
- private static final EventType OPENED = new EventType();
-
- private static final EventType CLOSED = new EventType();
-
- private static final EventType READ = new EventType();
-
- private static final EventType WRITTEN = new EventType();
-
- private static final EventType IDLE = new EventType();
-
- private static final EventType EXCEPTION = new EventType();
-
- private EventType()
- {
- }
- }
-
- private class Worker extends Thread
+ protected void processEvent( Object nextFilter0, Session session0,
+ EventType type, Object data )
{
- private final Object promotionLock = new Object();
-
- private Worker()
- {
- super( "IoThreadPool-" + ( threadId++ ) );
- increasePoolSize();
- }
-
- public void lead()
- {
- synchronized( promotionLock )
- {
- leader = this;
- promotionLock.notify();
- }
- }
-
- public void run()
- {
- for( ;; )
- {
- if( !waitForPromotion() )
- break;
-
- SessionBuffer buf = fetchBuffer();
- giveUpLead();
-
- if( buf == null )
- {
- break;
- }
-
- processEvents( buf );
- follow();
- releaseBuffer( buf );
- }
-
- decreasePoolSize();
- }
-
- private SessionBuffer fetchBuffer()
+ NextFilter nextFilter = ( NextFilter ) nextFilter0;
+ IoSession session = ( IoSession ) session0;
+ if( type == EventType.READ )
{
- SessionBuffer buf = null;
- synchronized( readySessionBuffers )
- {
- do
- {
- buf = null;
- try
- {
- readySessionBuffers.waitForNewItem();
- }
- catch( InterruptedException e )
- {
- break;
- }
-
- Iterator it = readySessionBuffers.iterator();
- if( !it.hasNext() )
- {
- // exceeded keepAliveTime
- break;
- }
-
- do
- {
- buf = null;
- buf = ( SessionBuffer ) it.next();
- it.remove();
- }
- while( buf != null && buf.nextFilters.isEmpty()
- && it.hasNext() );
- }
- while( buf != null && buf.nextFilters.isEmpty() );
- }
-
- return buf;
- }
-
- private void processEvents( SessionBuffer buf )
- {
- IoSession session = buf.session;
- for( ;; )
- {
- NextFilter nextFilter;
- EventType type;
- Object data;
- synchronized( buf )
- {
- nextFilter = ( NextFilter ) buf.nextFilters.pop();
- if( nextFilter == null )
- break;
-
- type = ( EventType ) buf.eventTypes.pop();
- data = buf.eventDatum.pop();
- }
- processEvent( nextFilter, session, type, data );
- }
+ ByteBuffer buf = ( ByteBuffer ) data;
+ nextFilter.dataRead( session, buf );
+ buf.release();
}
-
- private void processEvent( NextFilter nextFilter, IoSession session,
- EventType type, Object data )
+ else if( type == EventType.WRITTEN )
{
- if( type == EventType.READ )
- {
- ByteBuffer buf = ( ByteBuffer ) data;
- nextFilter.dataRead( session, buf );
- buf.release();
- }
- else if( type == EventType.WRITTEN )
- {
- nextFilter.dataWritten( session, data );
- }
- else if( type == EventType.EXCEPTION )
- {
- nextFilter.exceptionCaught( session, ( Throwable ) data );
- }
- else if( type == EventType.IDLE )
- {
- nextFilter.sessionIdle( session, ( IdleStatus ) data );
- }
- else if( type == EventType.OPENED )
- {
- nextFilter.sessionOpened( session );
- }
- else if( type == EventType.CLOSED )
- {
- nextFilter.sessionClosed( session );
- }
+ nextFilter.dataWritten( session, data );
}
-
- private void follow()
+ else if( type == EventType.EXCEPTION )
{
- synchronized( promotionLock )
- {
- if( this != leader )
- {
- synchronized( followers )
- {
- followers.push( this );
- }
- }
- }
+ nextFilter.exceptionCaught( session, ( Throwable ) data );
}
-
- private void releaseBuffer( SessionBuffer buf )
+ else if( type == EventType.IDLE )
{
- synchronized( readySessionBuffers )
- {
- busySessionBuffers.remove( buf );
- if( buf.nextFilters.isEmpty() )
- {
- removeSessionBuffer( buf );
- }
- else
- {
- readySessionBuffers.add( buf );
- }
- }
+ nextFilter.sessionIdle( session, ( IdleStatus ) data );
}
-
- private boolean waitForPromotion()
+ else if( type == EventType.OPENED )
{
- synchronized( promotionLock )
- {
- if( this != leader )
- {
- try
- {
- int keepAliveTime = getKeepAliveTime();
- if( keepAliveTime > 0 )
- {
- promotionLock.wait( keepAliveTime );
- }
- else
- {
- promotionLock.wait();
- }
- }
- catch( InterruptedException e )
- {
- }
- }
-
- boolean timeToLead = this == leader;
-
- if( !timeToLead )
- {
- // time to die
- synchronized( followers )
- {
- followers.remove( this );
- }
- }
-
- return timeToLead;
- }
+ nextFilter.sessionOpened( session );
}
-
- private void giveUpLead()
+ else if( type == EventType.CLOSED )
{
- Worker worker;
- synchronized( followers )
- {
- worker = ( Worker ) followers.pop();
- }
-
- if( worker != null )
- {
- worker.lead();
- }
- else
- {
- if( !shuttingDown )
- {
- synchronized( IoThreadPoolFilter.this )
- {
- if( !shuttingDown
- && getPoolSize() < getMaximumPoolSize() )
- {
- worker = new Worker();
- worker.start();
- worker.lead();
- }
- }
- }
- }
+ nextFilter.sessionClosed( session );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/protocol/filter/ProtocolThreadPoolFilter.java Thu Apr 21 19:50:31 2005
@@ -18,157 +18,36 @@
*/
package org.apache.mina.protocol.filter;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.Session;
import org.apache.mina.protocol.ProtocolHandler;
import org.apache.mina.protocol.ProtocolHandlerFilter;
import org.apache.mina.protocol.ProtocolSession;
-import org.apache.mina.util.BlockingSet;
-import org.apache.mina.util.Queue;
-import org.apache.mina.util.Stack;
+import org.apache.mina.util.BaseThreadPool;
+import org.apache.mina.util.EventType;
+import org.apache.mina.util.ThreadPool;
/**
* A Thread-pooling filter. This filter forwards {@link ProtocolHandler} events
- * to its thread pool. This is an implementation of
- * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
- * thread pool</a> by Douglas C. Schmidt et al.
+ * to its thread pool.
*
* @author Trustin Lee (trustin@apache.org)
* @version $Rev$, $Date$
*
- * @author Trustin Lee (trustin@apache.org)
- * @version $Rev$, $Date$
+ * @see ThreadPool
+ * @see BaseThreadPool
*/
-public class ProtocolThreadPoolFilter implements ProtocolHandlerFilter
+public class ProtocolThreadPoolFilter extends BaseThreadPool implements ThreadPool, ProtocolHandlerFilter
{
- public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
-
- public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
-
- private static volatile int threadId = 0;
-
- private Map buffers = new IdentityHashMap();
-
- private Stack followers = new Stack();
-
- private Worker leader;
-
- private BlockingSet readySessionBuffers = new BlockingSet();
-
- private Set busySessionBuffers = new HashSet();
-
- private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
-
- private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
-
- private boolean started;
-
- private boolean shuttingDown;
-
- private int poolSize;
-
- private final Object poolSizeLock = new Object();
+ /**
+ * Creates a new instanceof this filter with default thread pool settings.
+ * You'll have to invoke {@link #start()} method to start threads actually.
+ */
public ProtocolThreadPoolFilter()
{
}
- public int getPoolSize()
- {
- synchronized( poolSizeLock )
- {
- return poolSize;
- }
- }
-
- public int getMaximumPoolSize()
- {
- return maximumPoolSize;
- }
-
- public int getKeepAliveTime()
- {
- return keepAliveTime;
- }
-
- public void setMaximumPoolSize( int maximumPoolSize )
- {
- if( maximumPoolSize <= 0 )
- throw new IllegalArgumentException();
- this.maximumPoolSize = maximumPoolSize;
- }
-
- public void setKeepAliveTime( int keepAliveTime )
- {
- this.keepAliveTime = keepAliveTime;
- }
-
- public synchronized void start()
- {
- if( started )
- return;
-
- shuttingDown = false;
-
- leader = new Worker();
- leader.start();
- leader.lead();
-
- started = true;
- }
-
- public synchronized void stop()
- {
- if( !started )
- return;
-
- shuttingDown = true;
- Worker lastLeader = null;
- for( ;; )
- {
- Worker leader = this.leader;
- if( lastLeader == leader )
- break;
-
- while( leader.isAlive() )
- {
- leader.interrupt();
- try
- {
- leader.join();
- }
- catch( InterruptedException e )
- {
- }
- }
-
- lastLeader = leader;
- }
-
- started = false;
- }
-
- private void increasePoolSize()
- {
- synchronized( poolSizeLock )
- {
- poolSize++;
- }
- }
-
- private void decreasePoolSize()
- {
- synchronized( poolSizeLock )
- {
- poolSize--;
- }
- }
-
public void sessionOpened( NextFilter nextFilter,
ProtocolSession session )
{
@@ -205,315 +84,36 @@
fireEvent( nextFilter, session, EventType.SENT, message );
}
- private void fireEvent( NextFilter nextFilter,
- ProtocolSession session, EventType type, Object data )
- {
- SessionBuffer buf = getSessionBuffer( session );
- synchronized( buf )
- {
- buf.nextFilters.push( nextFilter );
- buf.eventTypes.push( type );
- buf.eventDatum.push( data );
- }
-
- synchronized( readySessionBuffers )
- {
- if( !busySessionBuffers.contains( buf ) )
- {
- busySessionBuffers.add( buf );
- readySessionBuffers.add( buf );
- }
- }
- }
-
- private SessionBuffer getSessionBuffer( ProtocolSession session )
+ protected void processEvent( Object nextFilter0,
+ Session session0, EventType type,
+ Object data )
{
- SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- synchronized( buffers )
- {
- buf = ( SessionBuffer ) buffers.get( session );
- if( buf == null )
- {
- buf = new SessionBuffer( session );
- buffers.put( session, buf );
- }
- }
- }
- return buf;
- }
+ NextFilter nextFilter = ( NextFilter ) nextFilter0;
+ ProtocolSession session = ( ProtocolSession ) session0;
- private void removeSessionBuffer( SessionBuffer buf )
- {
- synchronized( buffers )
+ if( type == EventType.RECEIVED )
{
- buffers.remove( buf.session );
+ nextFilter.messageReceived( session, data );
}
- }
-
- private static class SessionBuffer
- {
-
- private final ProtocolSession session;
-
- private final Queue nextFilters = new Queue();
-
- private final Queue eventTypes = new Queue();
-
- private final Queue eventDatum = new Queue();
-
- private SessionBuffer( ProtocolSession session )
+ else if( type == EventType.SENT )
{
- this.session = session;
+ nextFilter.messageSent( session, data );
}
- }
-
- private static class EventType
- {
- private static final EventType OPENED = new EventType();
-
- private static final EventType CLOSED = new EventType();
-
- private static final EventType RECEIVED = new EventType();
-
- private static final EventType SENT = new EventType();
-
- private static final EventType IDLE = new EventType();
-
- private static final EventType EXCEPTION = new EventType();
-
- private EventType()
+ else if( type == EventType.EXCEPTION )
{
+ nextFilter.exceptionCaught( session, ( Throwable ) data );
}
- }
-
- private class Worker extends Thread
- {
- private final Object promotionLock = new Object();
-
- private Worker()
+ else if( type == EventType.IDLE )
{
- super( "ProtocolThreadPool-" + ( threadId++ ) );
- increasePoolSize();
+ nextFilter.sessionIdle( session, ( IdleStatus ) data );
}
-
- public void lead()
+ else if( type == EventType.OPENED )
{
- synchronized( promotionLock )
- {
- leader = this;
- promotionLock.notify();
- }
+ nextFilter.sessionOpened( session );
}
-
- public void run()
+ else if( type == EventType.CLOSED )
{
- for( ;; )
- {
- if( !waitForPromotion() )
- break;
-
- SessionBuffer buf = fetchBuffer();
- giveUpLead();
-
- if( buf == null )
- break;
-
- processEvents( buf );
- follow();
- releaseBuffer( buf );
- }
-
- decreasePoolSize();
- }
-
- private SessionBuffer fetchBuffer()
- {
- SessionBuffer buf = null;
- synchronized( readySessionBuffers )
- {
- do
- {
- buf = null;
- try
- {
- readySessionBuffers.waitForNewItem();
- }
- catch( InterruptedException e )
- {
- break;
- }
-
- Iterator it = readySessionBuffers.iterator();
- if( !it.hasNext() )
- {
- // exceeded keepAliveTime
- break;
- }
-
- do
- {
- buf = null;
- buf = ( SessionBuffer ) it.next();
- it.remove();
- }
- while( buf != null && buf.nextFilters.isEmpty()
- && it.hasNext() );
- }
- while( buf != null && buf.nextFilters.isEmpty() );
- }
-
- return buf;
- }
-
- private void processEvents( SessionBuffer buf )
- {
- ProtocolSession session = buf.session;
- for( ;; )
- {
- NextFilter nextFilter;
- EventType type;
- Object data;
- synchronized( buf )
- {
- nextFilter = ( NextFilter ) buf.nextFilters.pop();
- if( nextFilter == null )
- break;
-
- type = ( EventType ) buf.eventTypes.pop();
- data = buf.eventDatum.pop();
- }
- processEvent( nextFilter, session, type, data );
- }
- }
-
- private void processEvent( NextFilter nextFilter,
- ProtocolSession session, EventType type,
- Object data )
- {
- if( type == EventType.RECEIVED )
- {
- nextFilter.messageReceived( session, data );
- }
- else if( type == EventType.SENT )
- {
- nextFilter.messageSent( session, data );
- }
- else if( type == EventType.EXCEPTION )
- {
- nextFilter.exceptionCaught( session, ( Throwable ) data );
- }
- else if( type == EventType.IDLE )
- {
- nextFilter.sessionIdle( session, ( IdleStatus ) data );
- }
- else if( type == EventType.OPENED )
- {
- nextFilter.sessionOpened( session );
- }
- else if( type == EventType.CLOSED )
- {
- nextFilter.sessionClosed( session );
- }
- }
-
- private void follow()
- {
- synchronized( promotionLock )
- {
- if( this != leader )
- {
- synchronized( followers )
- {
- followers.push( this );
- }
- }
- }
- }
-
- private void releaseBuffer( SessionBuffer buf )
- {
- synchronized( readySessionBuffers )
- {
- busySessionBuffers.remove( buf );
- if( buf.nextFilters.isEmpty() )
- {
- removeSessionBuffer( buf );
- }
- else
- {
- readySessionBuffers.add( buf );
- }
- }
- }
-
- private boolean waitForPromotion()
- {
- synchronized( promotionLock )
- {
- if( this != leader )
- {
- try
- {
- int keepAliveTime = getKeepAliveTime();
- if( keepAliveTime > 0 )
- {
- promotionLock.wait( keepAliveTime );
- }
- else
- {
- promotionLock.wait();
- }
- }
- catch( InterruptedException e )
- {
- }
- }
-
- boolean timeToLead = this == leader;
-
- if( !timeToLead )
- {
- // time to die
- synchronized( followers )
- {
- followers.remove( this );
- }
- }
-
- return timeToLead;
- }
- }
-
- private void giveUpLead()
- {
- Worker worker;
- synchronized( followers )
- {
- worker = ( Worker ) followers.pop();
- }
-
- if( worker != null )
- {
- worker.lead();
- }
- else
- {
- if( !shuttingDown )
- {
- synchronized( ProtocolThreadPoolFilter.this )
- {
- if( !shuttingDown
- && getPoolSize() < getMaximumPoolSize() )
- {
- worker = new Worker();
- worker.start();
- worker.lead();
- }
- }
- }
- }
+ nextFilter.sessionClosed( session );
}
}
Modified: directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java?rev=164162&r1=164161&r2=164162&view=diff
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java (original)
+++ directory/network/trunk/src/java/org/apache/mina/util/AvailablePortFinder.java Thu Apr 21 19:50:31 2005
@@ -16,7 +16,6 @@
* limitations under the License.
*
*/
-
package org.apache.mina.util;
import java.io.IOException;
Added: directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,442 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.mina.common.Session;
+
+/**
+ * A base implementation of Thread-pooling filters.
+ * This filter forwards events to its thread pool. This is an implementation of
+ * <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
+ * thread pool</a> by Douglas C. Schmidt et al.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public abstract class BaseThreadPool implements ThreadPool
+{
+ /**
+ * Default maximum size of thread pool (2G).
+ */
+ public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
+
+ /**
+ * Default keep-alive time of thread pool (1 min).
+ */
+ public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
+
+ private static volatile int threadId = 0;
+
+ private Map buffers = new IdentityHashMap();
+
+ private Stack followers = new Stack();
+
+ private Worker leader;
+
+ private BlockingSet readySessionBuffers = new BlockingSet();
+
+ private Set busySessionBuffers = new HashSet();
+
+ private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
+
+ private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
+
+ private boolean started;
+
+ private boolean shuttingDown;
+
+ private int poolSize;
+
+ private final Object poolSizeLock = new Object();
+
+ /**
+ * Creates a new instance with default thread pool settings.
+ * You'll have to invoke {@link #start()} method to start threads actually.
+ */
+ protected BaseThreadPool()
+ {
+ }
+
+ public int getPoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ return poolSize;
+ }
+ }
+
+ public int getMaximumPoolSize()
+ {
+ return maximumPoolSize;
+ }
+
+ public int getKeepAliveTime()
+ {
+ return keepAliveTime;
+ }
+
+ public void setMaximumPoolSize( int maximumPoolSize )
+ {
+ if( maximumPoolSize <= 0 )
+ throw new IllegalArgumentException();
+ this.maximumPoolSize = maximumPoolSize;
+ }
+
+ public void setKeepAliveTime( int keepAliveTime )
+ {
+ this.keepAliveTime = keepAliveTime;
+ }
+
+ public synchronized void start()
+ {
+ if( started )
+ return;
+
+ shuttingDown = false;
+
+ leader = new Worker();
+ leader.start();
+ leader.lead();
+
+ started = true;
+ }
+
+ public synchronized void stop()
+ {
+ if( !started )
+ return;
+
+ shuttingDown = true;
+ Worker lastLeader = null;
+ for( ;; )
+ {
+ Worker leader = this.leader;
+ if( lastLeader == leader )
+ break;
+
+ while( leader.isAlive() )
+ {
+ leader.interrupt();
+ try
+ {
+ leader.join();
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+
+ lastLeader = leader;
+ }
+
+ started = false;
+ }
+
+ private void increasePoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize++;
+ }
+ }
+
+ private void decreasePoolSize()
+ {
+ synchronized( poolSizeLock )
+ {
+ poolSize--;
+ }
+ }
+
+ protected void fireEvent( Object nextFilter, Session session,
+ EventType type, Object data )
+ {
+ SessionBuffer buf = getSessionBuffer( session );
+ synchronized( buf )
+ {
+ buf.nextFilters.push( nextFilter );
+ buf.eventTypes.push( type );
+ buf.eventDatum.push( data );
+ }
+
+ synchronized( readySessionBuffers )
+ {
+ if( !busySessionBuffers.contains( buf ) )
+ {
+ busySessionBuffers.add( buf );
+ readySessionBuffers.add( buf );
+ }
+ }
+ }
+
+ /**
+ * Implement this method to forward events to <tt>nextFilter</tt>.
+ */
+ protected abstract void processEvent( Object nextFilter, Session session,
+ EventType type, Object data );
+
+ private SessionBuffer getSessionBuffer( Session session )
+ {
+ SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ synchronized( buffers )
+ {
+ buf = ( SessionBuffer ) buffers.get( session );
+ if( buf == null )
+ {
+ buf = new SessionBuffer( session );
+ buffers.put( session, buf );
+ }
+ }
+ }
+ return buf;
+ }
+
+ private void removeSessionBuffer( SessionBuffer buf )
+ {
+ synchronized( buffers )
+ {
+ buffers.remove( buf.session );
+ }
+ }
+
+ private static class SessionBuffer
+ {
+ private final Session session;
+
+ private final Queue nextFilters = new Queue();
+
+ private final Queue eventTypes = new Queue();
+
+ private final Queue eventDatum = new Queue();
+
+ private SessionBuffer( Session session )
+ {
+ this.session = session;
+ }
+ }
+
+ private class Worker extends Thread
+ {
+ private final Object promotionLock = new Object();
+
+ private Worker()
+ {
+ super( "IoThreadPool-" + ( threadId++ ) );
+ increasePoolSize();
+ }
+
+ public void lead()
+ {
+ synchronized( promotionLock )
+ {
+ leader = this;
+ promotionLock.notify();
+ }
+ }
+
+ public void run()
+ {
+ for( ;; )
+ {
+ if( !waitForPromotion() )
+ break;
+
+ SessionBuffer buf = fetchBuffer();
+ giveUpLead();
+
+ if( buf == null )
+ {
+ break;
+ }
+
+ processEvents( buf );
+ follow();
+ releaseBuffer( buf );
+ }
+
+ decreasePoolSize();
+ }
+
+ private SessionBuffer fetchBuffer()
+ {
+ SessionBuffer buf = null;
+ synchronized( readySessionBuffers )
+ {
+ do
+ {
+ buf = null;
+ try
+ {
+ readySessionBuffers.waitForNewItem();
+ }
+ catch( InterruptedException e )
+ {
+ break;
+ }
+
+ Iterator it = readySessionBuffers.iterator();
+ if( !it.hasNext() )
+ {
+ // exceeded keepAliveTime
+ break;
+ }
+
+ do
+ {
+ buf = null;
+ buf = ( SessionBuffer ) it.next();
+ it.remove();
+ }
+ while( buf != null && buf.nextFilters.isEmpty()
+ && it.hasNext() );
+ }
+ while( buf != null && buf.nextFilters.isEmpty() );
+ }
+
+ return buf;
+ }
+
+ private void processEvents( SessionBuffer buf )
+ {
+ Session session = buf.session;
+ for( ;; )
+ {
+ Object nextFilter;
+ EventType type;
+ Object data;
+ synchronized( buf )
+ {
+ nextFilter = buf.nextFilters.pop();
+ if( nextFilter == null )
+ break;
+
+ type = ( EventType ) buf.eventTypes.pop();
+ data = buf.eventDatum.pop();
+ }
+ processEvent( nextFilter, session, type, data );
+ }
+ }
+
+ private void follow()
+ {
+ synchronized( promotionLock )
+ {
+ if( this != leader )
+ {
+ synchronized( followers )
+ {
+ followers.push( this );
+ }
+ }
+ }
+ }
+
+ private void releaseBuffer( SessionBuffer buf )
+ {
+ synchronized( readySessionBuffers )
+ {
+ busySessionBuffers.remove( buf );
+ if( buf.nextFilters.isEmpty() )
+ {
+ removeSessionBuffer( buf );
+ }
+ else
+ {
+ readySessionBuffers.add( buf );
+ }
+ }
+ }
+
+ private boolean waitForPromotion()
+ {
+ synchronized( promotionLock )
+ {
+ if( this != leader )
+ {
+ try
+ {
+ int keepAliveTime = getKeepAliveTime();
+ if( keepAliveTime > 0 )
+ {
+ promotionLock.wait( keepAliveTime );
+ }
+ else
+ {
+ promotionLock.wait();
+ }
+ }
+ catch( InterruptedException e )
+ {
+ }
+ }
+
+ boolean timeToLead = this == leader;
+
+ if( !timeToLead )
+ {
+ // time to die
+ synchronized( followers )
+ {
+ followers.remove( this );
+ }
+ }
+
+ return timeToLead;
+ }
+ }
+
+ private void giveUpLead()
+ {
+ Worker worker;
+ synchronized( followers )
+ {
+ worker = ( Worker ) followers.pop();
+ }
+
+ if( worker != null )
+ {
+ worker.lead();
+ }
+ else
+ {
+ if( !shuttingDown )
+ {
+ synchronized( BaseThreadPool.this )
+ {
+ if( !shuttingDown
+ && getPoolSize() < getMaximumPoolSize() )
+ {
+ worker = new Worker();
+ worker.start();
+ worker.lead();
+ }
+ }
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
Propchange: directory/network/trunk/src/java/org/apache/mina/util/BaseThreadPool.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision
Added: directory/network/trunk/src/java/org/apache/mina/util/EventType.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/EventType.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/EventType.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/EventType.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,50 @@
+/*
+ * @(#) $Id: AvailablePortFinder.java 155923 2005-03-02 14:23:42Z trustin $
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * Enumeration for MINA event types.
+ * Used by {@link ThreadPool}s when they push events to event queue.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public class EventType
+{
+ public static final EventType OPENED = new EventType();
+
+ public static final EventType CLOSED = new EventType();
+
+ public static final EventType READ = new EventType();
+
+ public static final EventType WRITTEN = new EventType();
+
+ public static final EventType RECEIVED = new EventType();
+
+ public static final EventType SENT = new EventType();
+
+ public static final EventType IDLE = new EventType();
+
+ public static final EventType EXCEPTION = new EventType();
+
+ private EventType()
+ {
+ }
+}
\ No newline at end of file
Added: directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
URL: http://svn.apache.org/viewcvs/directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java?rev=164162&view=auto
==============================================================================
--- directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java (added)
+++ directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java Thu Apr 21 19:50:31 2005
@@ -0,0 +1,66 @@
+/*
+ * @(#) $Id$
+ *
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.mina.util;
+
+/**
+ * A generic thread pool interface.
+ *
+ * @author The Apache Directory Project (dev@directory.apache.org)
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$
+ */
+public interface ThreadPool {
+
+ /**
+ * Returns the number of threads in the thread pool.
+ */
+ int getPoolSize();
+
+ /**
+ * Returns the maximum size of the thread pool.
+ */
+ int getMaximumPoolSize();
+
+ /**
+ * Returns the keep-alive time until the thread suicides after it became
+ * idle (milliseconds unit).
+ */
+ int getKeepAliveTime();
+
+ /**
+ * Sets the maximum size of the thread pool.
+ */
+ void setMaximumPoolSize( int maximumPoolSize );
+
+ /**
+ * Sets the keep-alive time until the thread suicides after it became idle
+ * (milliseconds unit).
+ */
+ void setKeepAliveTime( int keepAliveTime );
+
+ /**
+ * Starts thread pool threads and starts forwarding events to them.
+ */
+ void start();
+
+ /**
+ * Stops all thread pool threads.
+ */
+ void stop();
+}
Propchange: directory/network/trunk/src/java/org/apache/mina/util/ThreadPool.java
------------------------------------------------------------------------------
svn:keywords = HeadURL Id LastChangedBy LastChangedDate LastChangedRevision