You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@avalon.apache.org by bl...@apache.org on 2003/01/27 20:54:54 UTC

cvs commit: jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command AbstractThreadManager.java DefaultThreadManager.java TPCThreadManager.java

bloritsch    2003/01/27 11:54:54

  Modified:    event/src/java/org/apache/excalibur/event/command
                        AbstractThreadManager.java
                        DefaultThreadManager.java TPCThreadManager.java
  Log:
  use util.concurrent and simplify the ThreadManager
  
  Revision  Changes    Path
  1.15      +52 -214   jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java
  
  Index: AbstractThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java,v
  retrieving revision 1.14
  retrieving revision 1.15
  diff -u -r1.14 -r1.15
  --- AbstractThreadManager.java	27 Jan 2003 18:26:04 -0000	1.14
  +++ AbstractThreadManager.java	27 Jan 2003 19:54:54 -0000	1.15
  @@ -49,19 +49,17 @@
   */
   package org.apache.excalibur.event.command;
   
  -import java.util.HashMap;
   import java.util.Iterator;
  -import java.util.LinkedList;
  +
  +import org.apache.commons.collections.StaticBucketMap;
   
   import org.apache.avalon.framework.activity.Disposable;
   import org.apache.avalon.framework.activity.Initializable;
   import org.apache.avalon.framework.logger.AbstractLogEnabled;
   import org.apache.excalibur.event.EventHandler;
   import org.apache.excalibur.event.Source;
  -import org.apache.excalibur.thread.ThreadControl;
  -import org.apache.excalibur.thread.ThreadPool;
   
  -import EDU.oswego.cs.dl.util.concurrent.ReentrantLock;
  +import EDU.oswego.cs.dl.util.concurrent.Executor;
   
   /**
    * Abstract base class for a ThreadManager that has a single ThreadPool for
  @@ -73,20 +71,11 @@
   public abstract class AbstractThreadManager extends AbstractLogEnabled
       implements Runnable, ThreadManager, Initializable, Disposable
   {
  -    /** The Mutex used in this ThreadManager */
  -    private final ReentrantLock m_mutex = new ReentrantLock();
  -
       /** The pipelines we are managing */
  -    private final HashMap m_pipelines = new HashMap();
  -
  -    /** The controls we have */
  -    private final LinkedList m_controls = new LinkedList();
  +    private final StaticBucketMap m_pipelines = new StaticBucketMap();
   
       /** The ThreadPool we are using */
  -    private ThreadPool m_threadPool;
  -
  -    /** The ThreadControl for the ThreadManager itself */
  -    private ThreadControl m_threadControl;
  +    private Executor m_executor;
   
       /** Whether we are done or not */
       private volatile boolean m_done = false;
  @@ -114,19 +103,27 @@
       }
   
       /**
  +     * Get the current amount of sleep time.
  +     */
  +    protected long getSleepTime()
  +    {
  +        return m_sleepTime;
  +    }
  +
  +    /**
        * Set the ThreadPool we are using
        *
        * @param threadPool  The ThreadPool
        */
  -    protected void setThreadPool( ThreadPool threadPool )
  +    protected void setExecutor( Executor executor )
       {
  -        if( null == m_threadPool )
  +        if( null == m_executor )
           {
  -            m_threadPool = threadPool;
  +            m_executor = executor;
           }
           else
           {
  -            throw new IllegalStateException( "Can only set thread pool once" );
  +            throw new IllegalStateException( "Can only set the executor once" );
           }
       }
   
  @@ -137,12 +134,12 @@
        */
       public void initialize() throws Exception
       {
  -        if( null == m_threadPool )
  +        if( null == m_executor )
           {
               throw new IllegalStateException( "No thread pool set" );
           }
   
  -        this.m_threadControl = this.m_threadPool.execute( this );
  +        m_executor.execute( this );
           this.m_initialized = true;
       }
   
  @@ -161,26 +158,18 @@
   
           try
           {
  -            m_mutex.acquire();
  +            PipelineRunner runner = new PipelineRunner( pipeline );
  +            runner.enableLogging( getLogger() );
  +            m_pipelines.put( pipeline, runner );
   
  -            try
  +            if( m_done )
               {
  -                PipelineRunner runner = new PipelineRunner( pipeline );
  -                runner.enableLogging( getLogger() );
  -                m_pipelines.put( pipeline, runner );
  -
  -                if( m_done )
  -                {
  -                    m_threadControl = m_threadPool.execute( this );
  -                }
  -            }
  -            finally
  -            {
  -                m_mutex.release();
  +                m_executor.execute( this );
               }
           }
           catch( InterruptedException ie )
           {
  +            getLogger().warn("Caught InterruptedException in register", ie);
               // ignore for now
           }
       }
  @@ -198,36 +187,11 @@
                                                + "deregistering a pipeline" );
           }
   
  -        try
  -        {
  -            m_mutex.acquire();
  -
  -            m_pipelines.remove( pipeline );
  +        m_pipelines.remove( pipeline );
   
  -            if( m_pipelines.isEmpty() )
  -            {
  -                m_done = true;
  -            }
  -        }
  -        catch (InterruptedException ie)
  +        if( m_pipelines.isEmpty() )
           {
  -            getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie );
  -        }
  -        finally
  -        {
  -            m_mutex.release();
  -        }
  -
  -        if( m_done )
  -        {
  -            try
  -            {
  -                m_threadControl.join( 1000 );
  -            }
  -            catch(InterruptedException ie)
  -            {
  -                getLogger().warn( "deregister(" + pipeline + ") threw an InterruptedException", ie );
  -            }
  +            m_done = true;
           }
       }
   
  @@ -242,55 +206,17 @@
                                                + "before deregistering pipelines" );
           }
   
  -        try
  -        {
  -            // Aquire mutex to clear pipelines and set the m_done flag
  -            m_mutex.acquire();
  -
  -            m_pipelines.clear();
  -
  -            m_done = true;
  -            Iterator it = m_controls.iterator();
  -
  -            while( it.hasNext() )
  -            {
  -                ThreadControl tc = (ThreadControl) it.next();
  -
  -                try
  -                {
  -                    tc.join( 1000 );
  -                }
  -                catch (InterruptedException e)
  -                {
  -                    tc.interrupt();
  -                }
  -
  -                getLogger().debug("disposed of another ThreadControl");
  -            }
  -
  -            if ( ! m_pipelines.isEmpty() )
  -            {
  -                throw new IllegalStateException("We still have pipelines, but no runners are available!");
  -            }
  -
  -        }
  -        catch (InterruptedException ie)
  -        {
  -            getLogger().warn( "deregisterAl() threw an InterruptedException", ie );
  -        }
  -        finally
  +        Iterator it = m_pipelines.keySet().iterator();
  +        while ( it.hasNext() )
           {
  -            //  C.K. We must release the mutex to give the manager thread a chance to terminate.
  -            m_mutex.release();
  +            deregister( (EventPipeline) it.next() );
           }
   
  -        try
  -        {
  -            m_threadControl.join( 1000 );
  -        }
  -        catch (InterruptedException ie)
  +        m_done = true;
  +
  +        if ( ! m_pipelines.isEmpty() )
           {
  -            getLogger().warn( "deregisterAl() threw an InterruptedException", ie );
  +            throw new IllegalStateException("We still have pipelines, but no runners are available!");
           }
       }
   
  @@ -303,133 +229,45 @@
           deregisterAll();
   
           doDispose();
  -
  -        if( m_threadControl != null && !m_threadControl.isFinished() )
  -        {
  -            if( getLogger().isErrorEnabled() )
  -            {
  -                getLogger().error( "The ThreadManager management thread is still active." );
  -            }
  -        }
  -
  -        m_threadControl = null;
       }
   
       protected void doDispose() {} // default impl to work with released code
   
       /**
  -     * Return the thread controlls of all active threads
  -     * (excluding the ThreadManager management thread)
  -     */
  -    protected ThreadControl[] getThreadControls()
  -    {
  -        try
  -        {
  -            m_mutex.acquire();
  -            return ( ThreadControl[] ) m_controls.toArray( new ThreadControl[0] );
  -        }
  -        catch( InterruptedException ie )
  -        {
  -            return new ThreadControl[0];
  -        }
  -        finally
  -        {
  -            m_mutex.release();
  -        }
  -    }
  -
  -    /**
        * The code that is run in the background to manage the ThreadPool and the
        * EventPipelines
        */
       public void run()
       {
  -        try
  +        while( !m_done )
           {
  -            while( !m_done )
  -            {
  -                try
  -                {
  -                    m_mutex.acquire();
  -
  -                    Iterator i = m_pipelines.values().iterator();
  -
  -                    while( i.hasNext() )
  -                    {
  -                        PipelineRunner nextRunner = ( PipelineRunner ) i.next();
  -                        ThreadControl control = null;
  +            Iterator i = m_pipelines.values().iterator();
   
  -                        while( control == null )
  -                        {
  -                            try
  -                            {
  -                                control = m_threadPool.execute( nextRunner );
  -                            }
  -                            catch( IllegalStateException e )
  -                            {
  -                                // that's the way ResourceLimitingThreadPool reports
  -                                // that it has no threads available, will still try
  -                                // to go on, hopefully at one point there will be
  -                                // a thread to execute our runner
  -
  -                                if( getLogger().isWarnEnabled() )
  -                                {
  -                                    getLogger().warn( "Unable to execute pipeline (If out of threads, "
  -                                                      + "increase block-timeout or number of threads "
  -                                                      + "per processor", e );
  -                                }
  -                            }
  -
  -                            if (getLogger().isDebugEnabled())
  -                            {
  -                                getLogger().debug( "Waiting on " + control );
  -                            }
  -                        }
  +            while( i.hasNext() )
  +            {
  +                PipelineRunner nextRunner = ( PipelineRunner ) i.next();
   
  -                        m_controls.add( control );
  -                    }
  -                }
  -                finally
  +                try
                   {
  -                    m_mutex.release();
  +                    m_executor.execute( nextRunner );
                   }
  -
  -                Thread.sleep( m_sleepTime );
  -
  -                try
  +                catch( Exception e )
                   {
  -                    m_mutex.acquire();
  -
  -                    Iterator it = m_controls.iterator();
  -
  -                    while( it.hasNext() )
  +                    if( getLogger().isErrorEnabled() )
                       {
  -                        ThreadControl control = ( ThreadControl ) it.next();
  -                        if( control.isFinished() )
  -                        {
  -                            it.remove();
  -                        }
  +                        getLogger().error( "Caught exception in ThreadManager management thread", e );
                       }
                   }
  -                finally
  -                {
  -                    m_mutex.release();
  -                }
               }
  -        }
  -        catch( InterruptedException e )
  -        {
  -            Thread.interrupted();
  -        }
  -        catch( RuntimeException e )
  -        {
  -            if( getLogger().isFatalErrorEnabled() )
  +
  +            try
  +            {
  +                Thread.sleep( m_sleepTime );
  +            }
  +            catch( InterruptedException e )
               {
  -                getLogger().fatalError( "ThreadManager management thread aborting "
  -                                        + " due to exception", e );
  +                Thread.interrupted();
               }
  -
  -            throw e;
           }
       }
   
  
  
  
  1.5       +3 -3      jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java
  
  Index: DefaultThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DefaultThreadManager.java	30 Sep 2002 16:17:01 -0000	1.4
  +++ DefaultThreadManager.java	27 Jan 2003 19:54:54 -0000	1.5
  @@ -49,7 +49,7 @@
   */
   package org.apache.excalibur.event.command;
   
  -import org.apache.excalibur.thread.ThreadPool;
  +import EDU.oswego.cs.dl.util.concurrent.Executor;
   
   /**
    * A ThreadManager that will use an external ThreadPool. This will be useful
  @@ -68,8 +68,8 @@
        *
        * @param pool  The ThreadPool we will use.
        */
  -    public DefaultThreadManager( final ThreadPool pool )
  +    public DefaultThreadManager( final Executor executor )
       {
  -        setThreadPool( pool );
  +        setExecutor( executor );
       }
   }
  
  
  
  1.35      +26 -14    jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
  
  Index: TPCThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
  retrieving revision 1.34
  retrieving revision 1.35
  diff -u -r1.34 -r1.35
  --- TPCThreadManager.java	27 Jan 2003 18:26:04 -0000	1.34
  +++ TPCThreadManager.java	27 Jan 2003 19:54:54 -0000	1.35
  @@ -53,9 +53,10 @@
   import org.apache.avalon.framework.parameters.ParameterException;
   import org.apache.avalon.framework.parameters.Parameterizable;
   import org.apache.avalon.framework.parameters.Parameters;
  -import org.apache.excalibur.thread.ThreadControl;
   import org.apache.excalibur.util.SystemUtil;
   
  +import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
  +
   /**
    * This is a ThreadManager that uses a certain number of threads per
    * processor.  The number of threads in the pool is a direct proportion to
  @@ -67,10 +68,11 @@
    */
   public final class TPCThreadManager extends AbstractThreadManager implements Parameterizable
   {
  -    private EventThreadPool m_tpool;
  +    private PooledExecutor m_threadPool;
       private long m_blockTimeout = 1000L;
       private int m_processors = -1;
       private int m_threadsPerProcessor = 1;
  +    private boolean m_hardShutdown = false;
   
       /**
        * The following parameters can be set for this class:
  @@ -118,6 +120,8 @@
   
           setSleepTime( parameters.getParameterAsLong( "sleep-time", 1000L ) );
           this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L );
  +
  +        this.m_hardShutdown = ( parameters.getParameterAsBoolean( "force-shutdown", false ) );
       }
   
       public void initialize() throws Exception
  @@ -132,32 +136,40 @@
               throw new IllegalStateException( "ThreadManager is already initailized" );
           }
   
  -        m_tpool = new EventThreadPool( "TPCThreadManager",
  -                                       ( m_processors * m_threadsPerProcessor ) + 1, ( int ) m_blockTimeout );
  +        m_threadPool = new PooledExecutor(( m_processors * m_threadsPerProcessor ) + 1);
  +        m_threadPool.setMinimumPoolSize( 2 ); // at least two threads
  +        m_threadPool.setKeepAliveTime( getSleepTime() );
  +        m_threadPool.waitWhenBlocked();
   
           if( null == getLogger() )
           {
               this.enableLogging( new NullLogger() );
           }
   
  -        setThreadPool( m_tpool );
  +        setExecutor( m_threadPool );
   
           super.initialize();
       }
   
       protected final void doDispose()
       {
  -        // We should dispose all active threads
  -        final ThreadControl[] threads = getThreadControls();
  -
  -        for( int i = 0; i < threads.length; i++ )
  +        if ( m_hardShutdown )
  +        {
  +            m_threadPool.shutdownNow();
  +        }
  +        else
           {
  -            if( !threads[i].isFinished() )
  -            {
  -                m_tpool.dispose( threads[i] );
  -            }
  +            m_threadPool.shutdownAfterProcessingCurrentlyQueuedTasks();
           }
   
  -        m_tpool.dispose();
  +        try
  +        {
  +            m_threadPool.awaitTerminationAfterShutdown( getSleepTime() );
  +        }
  +        catch (InterruptedException ie)
  +        {
  +            getLogger().warn("Thread pool took longer than " + getSleepTime() +
  +                 " ms to shut down", ie);
  +        }
       }
   }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>