You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@avalon.apache.org by pr...@apache.org on 2002/07/16 23:02:12 UTC

cvs commit: jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command AbstractThreadManager.java DefaultThreadManager.java TPCThreadManager.java

proyal      2002/07/16 14:02:11

  Modified:    event/src/java/org/apache/excalibur/event/command
                        TPCThreadManager.java
  Added:       event/src/java/org/apache/excalibur/event/command
                        AbstractThreadManager.java
                        DefaultThreadManager.java
  Log:
  * Extracted superclass from TPCThreadManager that has everything
  but a ThreadPool
  * Created new DefaultThreadManager which takes a
  ThreadPool in the constructor
  
  Revision  Changes    Path
  1.21      +10 -228   jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java
  
  Index: TPCThreadManager.java
  ===================================================================
  RCS file: /home/cvs/jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/TPCThreadManager.java,v
  retrieving revision 1.20
  retrieving revision 1.21
  diff -u -r1.20 -r1.21
  --- TPCThreadManager.java	3 Jun 2002 21:21:07 -0000	1.20
  +++ TPCThreadManager.java	16 Jul 2002 21:02:10 -0000	1.21
  @@ -33,24 +33,12 @@
    * @author <a href="mailto:bloritsch@apache.org">Berin Loritsch</a>
    * @author <a href="mailto:proyal@apache.org">Peter Royal</a>
    */
  -public final class TPCThreadManager extends AbstractLogEnabled
  -  implements Runnable, ThreadManager, Parameterizable, Initializable, Disposable
  +public final class TPCThreadManager extends AbstractThreadManager implements Parameterizable
   {
  -    private final Mutex m_mutex = new Mutex();
  -    private final HashMap m_pipelines = new HashMap();
  -
  -    private ThreadPool m_threadPool;
  -    private ThreadControl m_threadControl;
  -    private boolean m_done = false;
  -
  -    //Set reasonable defaults in case parameterize() is never called.
  -    private long m_sleepTime = 1000L;
       private long m_blockTimeout = 1000L;
       private int m_processors = SystemUtil.numProcessors();
       private int m_threadsPerProcessor = 1;
   
  -    private boolean m_initialized = false;
  -
       /**
        * The following parameters can be set for this class:
        *
  @@ -83,16 +71,19 @@
       public void parameterize( Parameters parameters ) throws ParameterException
       {
           this.m_processors =
  -          Math.max( parameters.getParameterAsInteger( "processors", SystemUtil.numProcessors() ), 1 );
  +          Math.max( parameters.getParameterAsInteger( "processors", SystemUtil.numProcessors() ),
  +                    1 );
   
  -        this.m_threadsPerProcessor = Math.max( parameters.getParameterAsInteger( "threads-per-processor", 1 ), 1 );
  -        this.m_sleepTime = parameters.getParameterAsLong( "sleep-time", 1000L );
  +        this.m_threadsPerProcessor =
  +            Math.max( parameters.getParameterAsInteger( "threads-per-processor", 1 ), 1 );
  +
  +        setSleepTime( parameters.getParameterAsLong( "sleep-time", 1000L ) );
           this.m_blockTimeout = parameters.getParameterAsLong( "block-timeout", 1000L );
       }
   
       public void initialize() throws Exception
       {
  -        if( this.m_initialized )
  +        if( isInitialized() )
           {
               throw new IllegalStateException( "ThreadManager is already initailized" );
           }
  @@ -112,217 +103,8 @@
   
           tpool.enableLogging( getLogger() );
   
  -        this.m_threadPool = tpool;
  -        this.m_threadControl = this.m_threadPool.execute( this );
  -        this.m_initialized = true;
  -    }
  -
  -    /**
  -     * Register an EventPipeline with the ThreadManager.
  -     */
  -    public void register( EventPipeline pipeline )
  -    {
  -        if( !this.m_initialized )
  -        {
  -            throw new IllegalStateException( "ThreadManager must be initialized before registering a pipeline" );
  -        }
  -
  -        try
  -        {
  -            m_mutex.acquire();
  -
  -            try
  -            {
  -                PipelineRunner runner = new PipelineRunner( pipeline );
  -                runner.enableLogging( getLogger() );
  -                m_pipelines.put( pipeline, runner );
  -
  -                if( m_done )
  -                {
  -                    m_threadControl = m_threadPool.execute( this );
  -                }
  -            }
  -            finally
  -            {
  -                m_mutex.release();
  -            }
  -        }
  -        catch( InterruptedException ie )
  -        {
  -            // ignore for now
  -        }
  -    }
  -
  -    /**
  -     * Deregister an EventPipeline with the ThreadManager
  -     */
  -    public void deregister( EventPipeline pipeline )
  -    {
  -        if( !this.m_initialized )
  -        {
  -            throw new IllegalStateException( "ThreadManager must be initialized before deregistering a pipeline" );
  -        }
  -
  -        try
  -        {
  -            m_mutex.acquire();
  -
  -            try
  -            {
  -                m_pipelines.remove( pipeline );
  -
  -                if( m_pipelines.isEmpty() )
  -                {
  -                    m_done = true;
  -                    m_threadControl.join( 1000 );
  -                }
  -            }
  -            finally
  -            {
  -                m_mutex.release();
  -            }
  -        }
  -        catch( InterruptedException ie )
  -        {
  -            // ignore for now
  -        }
  -    }
  -
  -    /**
  -     * Deregisters all EventPipelines from this ThreadManager
  -     */
  -    public void deregisterAll()
  -    {
  -        if( !this.m_initialized )
  -        {
  -            throw new IllegalStateException( "ThreadManager must be initialized before deregistering pipelines" );
  -        }
  -
  -        try
  -        {
  -            m_mutex.acquire();
  -            try
  -            {
  -                m_done = true;
  -                m_pipelines.clear();
  -
  -                m_threadControl.join( 1000 );
  -            }
  -            finally
  -            {
  -                m_mutex.release();
  -            }
  -        }
  -        catch( InterruptedException ie )
  -        {
  -            // ignore for now
  -        }
  -    }
  -
  -    public final void dispose()
  -    {
  -        deregisterAll();
  -
  -        if( m_threadPool instanceof Disposable )
  -        {
  -            ( ( Disposable ) m_threadPool ).dispose();
  -        }
  -
  -        m_threadControl = null;
  -    }
  -
  -    public void run()
  -    {
  -        try
  -        {
  -            while( !m_done )
  -            {
  -                m_mutex.acquire();
  -
  -                try
  -                {
  -                    Iterator i = m_pipelines.values().iterator();
  +        setThreadPool( tpool );
   
  -                    while( i.hasNext() )
  -                    {
  -                        try
  -                        {
  -                            m_threadPool.execute( ( PipelineRunner ) i.next() );
  -                        }
  -                        catch( IllegalStateException e )
  -                        {
  -                            // that's the way ResourceLimitingThreadPool reports
  -                            // that it has no threads available, will still try
  -                            // to go on, hopefully at one point there will be
  -                            // a thread to execute our runner
  -
  -                            if( getLogger().isWarnEnabled() )
  -                            {
  -                                getLogger().warn( "Unable to execute pipeline (If out of threads, "
  -                                                  + "increase block-timeout or number of threads per processor", e );
  -                            }
  -                        }
  -                    }
  -                }
  -                finally
  -                {
  -                    m_mutex.release();
  -                }
  -
  -                Thread.sleep( m_sleepTime );
  -            }
  -        }
  -        catch( InterruptedException e )
  -        {
  -            Thread.interrupted();
  -        }
  -        catch( RuntimeException e )
  -        {
  -            if( getLogger().isFatalErrorEnabled() )
  -            {
  -                getLogger().fatalError( "TPCThreadManager management thread aborting "
  -                                        + " due to exception", e );
  -            }
  -
  -            throw e;
  -        }
  -    }
  -
  -    public static final class PipelineRunner
  -        extends AbstractLogEnabled
  -        implements Runnable
  -    {
  -        private final EventPipeline m_pipeline;
  -
  -        protected PipelineRunner( EventPipeline pipeline )
  -        {
  -            m_pipeline = pipeline;
  -        }
  -
  -        public void run()
  -        {
  -            Source[] sources = m_pipeline.getSources();
  -            EventHandler handler = m_pipeline.getEventHandler();
  -
  -            for( int i = 0; i < sources.length; i++ )
  -            {
  -                try
  -                {
  -                    handler.handleEvents( sources[i].dequeueAll() );
  -                }
  -                catch( RuntimeException e )
  -                {
  -                    // We want to catch this, because this is the only
  -                    // place where exceptions happening in this thread
  -                    // can be logged
  -
  -                    if( getLogger().isErrorEnabled() )
  -                    {
  -                        getLogger().error( "Exception processing EventPipeline [msg: " + e.getMessage() + "]",
  -                                           e );
  -                    }
  -                }
  -            }
  -        }
  +        super.initialize();
       }
   }
  
  
  
  1.1                  jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/AbstractThreadManager.java
  
  Index: AbstractThreadManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.excalibur.event.command;
  
  import java.util.HashMap;
  import java.util.Iterator;
  
  import org.apache.avalon.excalibur.concurrent.Mutex;
  import org.apache.avalon.excalibur.thread.ThreadControl;
  import org.apache.avalon.excalibur.thread.ThreadPool;
  import org.apache.avalon.framework.activity.Disposable;
  import org.apache.avalon.framework.activity.Initializable;
  import org.apache.avalon.framework.logger.AbstractLogEnabled;
  import org.apache.excalibur.event.EventHandler;
  import org.apache.excalibur.event.Source;
  
  /**
   * Abstract base class for a ThreadManager that has a single ThreadPool for all pipelines
   *
   * @author <a href="mailto:bloritsch@apache.org">Berin Loritsch</a>
   * @author <a href="mailto:proyal@apache.org">Peter Royal</a>
   */
  public abstract class AbstractThreadManager extends AbstractLogEnabled
      implements Runnable, ThreadManager, Initializable, Disposable
  {
      private final Mutex m_mutex = new Mutex();
      private final HashMap m_pipelines = new HashMap();
      private ThreadPool m_threadPool;
      private ThreadControl m_threadControl;
      private boolean m_done = false;
      private long m_sleepTime = 1000L;
      private boolean m_initialized = false;
  
      protected boolean isInitialized()
      {
          return m_initialized;
      }
  
      protected void setSleepTime( long sleepTime )
      {
          m_sleepTime = sleepTime;
      }
  
      protected void setThreadPool( ThreadPool threadPool )
      {
          if( null == m_threadPool )
          {
              m_threadPool = threadPool;
          }
          else
          {
              throw new IllegalStateException( "Can only set thread pool once" );
          }
      }
  
      public void initialize() throws Exception
      {
          if( null == m_threadPool )
          {
              throw new IllegalStateException( "No thread pool set" );
          }
  
          this.m_threadControl = this.m_threadPool.execute( this );
          this.m_initialized = true;
      }
  
      /**
       * Register an EventPipeline with the ThreadManager.
       */
      public void register( EventPipeline pipeline )
      {
          if( !isInitialized() )
          {
              throw new IllegalStateException( "ThreadManager must be initialized before "
                                               + "registering a pipeline" );
          }
  
          try
          {
              m_mutex.acquire();
  
              try
              {
                  PipelineRunner runner = new PipelineRunner( pipeline );
                  runner.enableLogging( getLogger() );
                  m_pipelines.put( pipeline, runner );
  
                  if( m_done )
                  {
                      m_threadControl = m_threadPool.execute( this );
                  }
              }
              finally
              {
                  m_mutex.release();
              }
          }
          catch( InterruptedException ie )
          {
              // ignore for now
          }
      }
  
      /**
       * Deregister an EventPipeline with the ThreadManager
       */
      public void deregister( EventPipeline pipeline )
      {
          if( !isInitialized() )
          {
              throw new IllegalStateException( "ThreadManager must be initialized before "
                                               + "deregistering a pipeline" );
          }
  
          try
          {
              m_mutex.acquire();
  
              try
              {
                  m_pipelines.remove( pipeline );
  
                  if( m_pipelines.isEmpty() )
                  {
                      m_done = true;
                      m_threadControl.join( 1000 );
                  }
              }
              finally
              {
                  m_mutex.release();
              }
          }
          catch( InterruptedException ie )
          {
              // ignore for now
          }
      }
  
      /**
       * Deregisters all EventPipelines from this ThreadManager
       */
      public void deregisterAll()
      {
          if( !isInitialized() )
          {
              throw new IllegalStateException( "ThreadManager must be initialized "
                                               + "before deregistering pipelines" );
          }
  
          try
          {
              m_mutex.acquire();
              try
              {
                  m_done = true;
                  m_pipelines.clear();
  
                  m_threadControl.join( 1000 );
              }
              finally
              {
                  m_mutex.release();
              }
          }
          catch( InterruptedException ie )
          {
              // ignore for now
          }
      }
  
      public final void dispose()
      {
          deregisterAll();
  
          if( m_threadPool instanceof Disposable )
          {
              ( ( Disposable ) m_threadPool ).dispose();
          }
  
          m_threadControl = null;
      }
  
      public void run()
      {
          try
          {
              while( !m_done )
              {
                  m_mutex.acquire();
  
                  try
                  {
                      Iterator i = m_pipelines.values().iterator();
  
                      while( i.hasNext() )
                      {
                          try
                          {
                              m_threadPool.execute( ( PipelineRunner ) i.next() );
                          }
                          catch( IllegalStateException e )
                          {
                              // that's the way ResourceLimitingThreadPool reports
                              // that it has no threads available, will still try
                              // to go on, hopefully at one point there will be
                              // a thread to execute our runner
  
                              if( getLogger().isWarnEnabled() )
                              {
                                  getLogger().warn( "Unable to execute pipeline (If out of threads, "
                                                    + "increase block-timeout or number of threads "
                                                    + "per processor", e );
                              }
                          }
                      }
                  }
                  finally
                  {
                      m_mutex.release();
                  }
  
                  Thread.sleep( m_sleepTime );
              }
          }
          catch( InterruptedException e )
          {
              Thread.interrupted();
          }
          catch( RuntimeException e )
          {
              if( getLogger().isFatalErrorEnabled() )
              {
                  getLogger().fatalError( "ThreadManager management thread aborting "
                                          + " due to exception", e );
              }
  
              throw e;
          }
      }
  
      public static final class PipelineRunner
          extends AbstractLogEnabled
          implements Runnable
      {
          private final EventPipeline m_pipeline;
  
          protected PipelineRunner( EventPipeline pipeline )
          {
              m_pipeline = pipeline;
          }
  
          public void run()
          {
              Source[] sources = m_pipeline.getSources();
              EventHandler handler = m_pipeline.getEventHandler();
  
              for( int i = 0; i < sources.length; i++ )
              {
                  try
                  {
                      handler.handleEvents( sources[i].dequeueAll() );
                  }
                  catch( RuntimeException e )
                  {
                      // We want to catch this, because this is the only
                      // place where exceptions happening in this thread
                      // can be logged
  
                      if( getLogger().isErrorEnabled() )
                      {
                          getLogger().error( "Exception processing EventPipeline [msg: "
                                             + e.getMessage() + "]", e );
                      }
                  }
              }
          }
      }
  }
  
  
  
  1.1                  jakarta-avalon-excalibur/event/src/java/org/apache/excalibur/event/command/DefaultThreadManager.java
  
  Index: DefaultThreadManager.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE.txt file.
   */
  package org.apache.excalibur.event.command;
  
  import org.apache.avalon.excalibur.thread.ThreadPool;
  
  /**
   * A ThreadManager that will use an external ThreadPool. This will be useful if you
   * want to have several ThreadManagers talking to a commonly defined set of ThreadPools,
   * such as <a href="http://jakarta.apache.org/avalon/cornerstone">Cornerstone's</a>
   * (similarly named) ThreadManager (which manages ThreadPools).
   *
   * @see org.apache.avalon.cornerstone.services.thread.ThreadManager
   * @author <a href="mailto:proyal@apache.org">Peter Royal</a>
   */
  public class DefaultThreadManager extends AbstractThreadManager
  {
      public DefaultThreadManager( final ThreadPool pool )
      {
          setThreadPool( pool );
      }
  }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>