You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@avalon.apache.org by bl...@apache.org on 2003/01/27 20:54:54 UTC
cvs commit: jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command AbstractThreadManager.java DefaultThreadManager.java TPCThreadManager.java
bloritsch 2003/01/27 11:54:54
Modified: event/src/java/org/apache/excalibur/event/command
AbstractThreadManager.java
DefaultThreadManager.java TPCThreadManager.java
Log:
use util.concurrent and simplify the ThreadManager
Revision Changes Path
1.15 +52 -214 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java
Index: AbstractThreadManager.java
===================================================================
RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java,v
retrieving revision 1.14
retrieving revision 1.15
diff -u -r1.14 -r1.15
--- AbstractThreadManager.java 27 Jan 2003 18:26:04 -0000 1.14
+++ AbstractThreadManager.java 27 Jan 2003 19:54:54 -0000 1.15
@@ -49,19 +49,17 @@
*/
package org.apache.excalibur.event.command;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
+
+import org.apache.commons.collections.StaticBucketMap;
import org.apache.avalon.framework.activity.Disposable;
import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.excalibur.event.EventHandler;
import org.apache.excalibur.event.Source;
-import org.apache.excalibur.thread.ThreadControl;
-import org.apache.excalibur.thread.ThreadPool;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
/**
* Abstract base class for a ThreadManager that has a single ThreadPool for
@@ -73,20 +71,11 @@
public abstract class AbstractThreadManager extends AbstractLogEnabled
implements Runnable, ThreadManager, Initializable, Disposable
{
- /** The Mutex used in this ThreadManager */
- private final ReentrantLock m_mutex = new ReentrantLock();
-
/** The pipelines we are managing */
- private final HashMap m_pipelines = new HashMap();
-
- /** The controls we have */
- private final LinkedList m_controls = new LinkedList();
+ private final StaticBucketMap m_pipelines = new StaticBucketMap();
/** The ThreadPool we are using */
- private ThreadPool m_threadPool;
-
- /** The ThreadControl for the ThreadManager itself */
- private ThreadControl m_threadControl;
+ private Executor m_executor;
/** Whether we are done or not */
private volatile boolean m_done = false;
@@ -114,19 +103,27 @@
}
/**
+ * Get the current amount of sleep time.
+ */
+ protected long getSleepTime()
+ {
+ return m_sleepTime;
+ }
+
+ /**
* Set the ThreadPool we are using
*
* @param threadPool The ThreadPool
*/
- protected void setThreadPool( ThreadPool threadPool )
+ protected void setExecutor( Executor executor )
{
- if( null == m_threadPool )
+ if( null == m_executor )
{
- m_threadPool = threadPool;
+ m_executor = executor;
}
else
{
- throw new IllegalStateException( "Can only set thread pool once" );
+ throw new IllegalStateException( "Can only set the executor once" );
}
}
@@ -137,12 +134,12 @@
*/
public void initialize() throws Exception
{
- if( null == m_threadPool )
+ if( null == m_executor )
{
throw new IllegalStateException( "No thread pool set" );
}
- this.m_threadControl = this.m_threadPool.execute( this );
+ m_executor.execute( this );
this.m_initialized = true;
}
@@ -161,26 +158,18 @@
try
{
- m_mutex.acquire();
+ PipelineRunner runner = new PipelineRunner( pipeline );
+ runner.enableLogging( getLogger() );
+ m_pipelines.put( pipeline, runner );
- try
+ if( m_done )
{
- PipelineRunner runner = new PipelineRunner( pipeline );
- runner.enableLogging( getLogger() );
- m_pipelines.put( pipeline, runner );
-
- if( m_done )
- {
- m_threadControl = m_threadPool.execute( this );
- }
- }
- finally
- {
- m_mutex.release();
+ m_executor.execute( this );
}
}
catch( InterruptedException ie )
{
+ getLogger().warn("Caught InterruptedException in register", ie);
// ignore for now
}
}
@@ -198,36 +187,11 @@
+ "deregistering a pipeline" );
}
- try
- {
- m_mutex.acquire();
-
- m_pipelines.remove( pipeline );
+ m_pipelines.remove( pipeline );
- if( m_pipelines.isEmpty() )
- {
- m_done = true;
- }
- }
- catch (InterruptedException ie)
+ if( m_pipelines.isEmpty() )
{
- getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie );
- }
- finally
- {
- m_mutex.release();
- }
-
- if( m_done )
- {
- try
- {
- m_threadControl.join( 1000 );
- }
- catch(InterruptedException ie)
- {
- getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie );
- }
+ m_done = true;
}
}
@@ -242,55 +206,17 @@
+ "before deregistering pipelines" );
}
- try
- {
- // Aquire mutex to clear pipelines and set the m_done flag
- m_mutex.acquire();
-
- m_pipelines.clear();
-
- m_done = true;
- Iterator it = m_controls.iterator();
-
- while( it.hasNext() )
- {
- ThreadControl tc = (ThreadControl) it.next();
-
- try
- {
- tc.join( 1000 );
- }
- catch (InterruptedException e)
- {
- tc.interrupt();
- }
-
- getLogger().debug("disposed of another ThreadControl");
- }
-
- if ( ! m_pipelines.isEmpty() )
- {
- throw new IllegalStateException("We still have pipelines, but no runners are available!");
- }
-
- }
- catch (InterruptedException ie)
- {
- getLogger().warn( "deregisterAl() threw an InterruptedException", ie );
- }
- finally
+ Iterator it = m_pipelines.keySet().iterator();
+ while ( it.hasNext() )
{
- // C.K. We must release the mutex to give the manager thread a chance to terminate.
- m_mutex.release();
+ deregister( (EventPipeline) it.next() );
}
- try
- {
- m_threadControl.join( 1000 );
- }
- catch (InterruptedException ie)
+ m_done = true;
+
+ if ( ! m_pipelines.isEmpty() )
{
- getLogger().warn( "deregisterAl() threw an InterruptedException", ie );
+ throw new IllegalStateException("We still have pipelines, but no runners are available!");
}
}
@@ -303,133 +229,45 @@
deregisterAll();
doDispose();
-
- if( m_threadControl != null && !m_threadControl.isFinished() )
- {
- if( getLogger().isErrorEnabled() )
- {
- getLogger().error( "The ThreadManager management thread is still active." );
- }
- }
-
- m_threadControl = null;
}
protected void doDispose() {} // default impl to work with released code
/**
- * Return the thread controlls of all active threads
- * (excluding the ThreadManager management thread)
- */
- protected ThreadControl[] getThreadControls()
- {
- try
- {
- m_mutex.acquire();
- return ( ThreadControl[] ) m_controls.toArray( new ThreadControl[0] );
- }
- catch( InterruptedException ie )
- {
- return new ThreadControl[0];
- }
- finally
- {
- m_mutex.release();
- }
- }
-
- /**
* The code that is run in the background to manage the ThreadPool and the
* EventPipelines
*/
public void run()
{
- try
+ while( !m_done )
{
- while( !m_done )
- {
- try
- {
- m_mutex.acquire();
-
- Iterator i = m_pipelines.values().iterator();
-
- while( i.hasNext() )
- {
- PipelineRunner nextRunner = ( PipelineRunner ) i.next();
- ThreadControl control = null;
+ Iterator i = m_pipelines.values().iterator();
- while( control == null )
- {
- try
- {
- control = m_threadPool.execute( nextRunner );
- }
- catch( IllegalStateException e )
- {
- // that's the way ResourceLimitingThreadPool reports
- // that it has no threads available, will still try
- // to go on, hopefully at one point there will be
- // a thread to execute our runner
-
- if( getLogger().isWarnEnabled() )
- {
- getLogger().warn( "Unable to execute pipeline (If out of threads, "
- + "increase block-timeout or number of threads "
- + "per processor", e );
- }
- }
-
- if (getLogger().isDebugEnabled())
- {
- getLogger().debug( "Waiting on " + control );
- }
- }
+ while( i.hasNext() )
+ {
+ PipelineRunner nextRunner = ( PipelineRunner ) i.next();
- m_controls.add( control );
- }
- }
- finally
+ try
{
- m_mutex.release();
+ m_executor.execute( nextRunner );
}
-
- Thread.sleep( m_sleepTime );
-
- try
+ catch( Exception e )
{
- m_mutex.acquire();
-
- Iterator it = m_controls.iterator();
-
- while( it.hasNext() )
+ if( getLogger().isErrorEnabled() )
{
- ThreadControl control = ( ThreadControl ) it.next();
- if( control.isFinished() )
- {
- it.remove();
- }
+ getLogger().error( "Caught exception in ThreadManager management thread", e );
}
}
- finally
- {
- m_mutex.release();
- }
}
- }
- catch( InterruptedException e )
- {
- Thread.interrupted();
- }
- catch( RuntimeException e )
- {
- if( getLogger().isFatalErrorEnabled() )
+
+ try
+ {
+ Thread.sleep( m_sleepTime );
+ }
+ catch( InterruptedException e )
{
- getLogger().fatalError( "ThreadManager management thread aborting "
- + " due to exception", e );
+ Thread.interrupted();
}
-
- throw e;
}
}
1.5 +3 -3 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java
Index: DefaultThreadManager.java
===================================================================
RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- DefaultThreadManager.java 30 Sep 2002 16:17:01 -0000 1.4
+++ DefaultThreadManager.java 27 Jan 2003 19:54:54 -0000 1.5
@@ -49,7 +49,7 @@
*/
package org.apache.excalibur.event.command;
-import org.apache.excalibur.thread.ThreadPool;
+import EDU.oswego.cs.dl.util.concurrent.Executor;
/**
* A ThreadManager that will use an external ThreadPool. This will be useful
@@ -68,8 +68,8 @@
*
* @param pool The ThreadPool we will use.
*/
- public DefaultThreadManager( final ThreadPool pool )
+ public DefaultThreadManager( final Executor executor )
{
- setThreadPool( pool );
+ setExecutor( executor );
}
}
1.35 +26 -14 jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
Index: TPCThreadManager.java
===================================================================
RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -r1.34 -r1.35
--- TPCThreadManager.java 27 Jan 2003 18:26:04 -0000 1.34
+++ TPCThreadManager.java 27 Jan 2003 19:54:54 -0000 1.35
@@ -53,9 +53,10 @@
import org.apache.avalon.framework.parameters.ParameterException;
import org.apache.avalon.framework.parameters.Parameterizable;
import org.apache.avalon.framework.parameters.Parameters;
-import org.apache.excalibur.thread.ThreadControl;
import org.apache.excalibur.util.SystemUtil;
+import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
+
/**
* This is a ThreadManager that uses a certain number of threads per
* processor. The number of threads in the pool is a direct proportion to
@@ -67,10 +68,11 @@
*/
public final class TPCThreadManager extends AbstractThreadManager implements Parameterizable
{
- private EventThreadPool m_tpool;
+ private PooledExecutor m_threadPool;
private long m_blockTimeout = 1000L;
private int m_processors = -1;
private int m_threadsPerProcessor = 1;
+ private boolean m_hardShutdown = false;
/**
* The following parameters can be set for this class:
@@ -118,6 +120,8 @@
setSleepTime( parameters.getParameterAsLong( "sleep-time", 1000L ) );
this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L );
+
+ this.m_hardShutdown = ( parameters.getParameterAsBoolean( "force-shutdown", false ) );
}
public void initialize() throws Exception
@@ -132,32 +136,40 @@
throw new IllegalStateException( "ThreadManager is already initailized" );
}
- m_tpool = new EventThreadPool( "TPCThreadManager",
- ( m_processors * m_threadsPerProcessor ) + 1, ( int ) m_blockTimeout );
+ m_threadPool = new PooledExecutor(( m_processors * m_threadsPerProcessor ) + 1);
+ m_threadPool.setMinimumPoolSize( 2 ); // at least two threads
+ m_threadPool.setKeepAliveTime( getSleepTime() );
+ m_threadPool.waitWhenBlocked();
if( null == getLogger() )
{
this.enableLogging( new NullLogger() );
}
- setThreadPool( m_tpool );
+ setExecutor( m_threadPool );
super.initialize();
}
protected final void doDispose()
{
- // We should dispose all active threads
- final ThreadControl[] threads = getThreadControls();
-
- for( int i = 0; i < threads.length; i++ )
+ if ( m_hardShutdown )
+ {
+ m_threadPool.shutdownNow();
+ }
+ else
{
- if( !threads[i].isFinished() )
- {
- m_tpool.dispose( threads[i] );
- }
+ m_threadPool.shutdownAfterProcessingCurrentlyQueuedTasks();
}
- m_tpool.dispose();
+ try
+ {
+ m_threadPool.awaitTerminationAfterShutdown( getSleepTime() );
+ }
+ catch (InterruptedException ie)
+ {
+ getLogger().warn("Thread pool took longer than " + getSleepTime() +
+ " ms to shut down", ie);
+ }
}
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>