You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2005/01/07 23:30:41 UTC

cvs commit: jakarta-turbine-jcs/src/java/org/apache/jcs/engine PooledCacheEventQueue.java

asmuts      2005/01/07 14:30:41

  Modified:    src/java/org/apache/jcs/utils/threadpool
                        PoolConfiguration.java
  Added:       src/java/org/apache/jcs/engine PooledCacheEventQueue.java
  Removed:     src/experimental/org/apache/jcs/engine
                        PooledCacheEventQueue.java
  Log:
  Added the ability to set auxiliaries to use a thread pools in the event queue.  By default none are used and no
  threadpool configuration is necessary.  Everything is as is unless you configure it otherwise.
  
  No version update.  I need to make sure the indexed disk cache can function
  with more than one thread pulling out of the queue.
  
  Revision  Changes    Path
  1.3       +86 -25    jakarta-turbine-jcs/src/java/org/apache/jcs/utils/threadpool/PoolConfiguration.java
  
  Index: PoolConfiguration.java
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-jcs/src/java/org/apache/jcs/utils/threadpool/PoolConfiguration.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- PoolConfiguration.java	6 Jan 2005 04:44:27 -0000	1.2
  +++ PoolConfiguration.java	7 Jan 2005 22:30:41 -0000	1.3
  @@ -1,5 +1,7 @@
   package org.apache.jcs.utils.threadpool;
   
  +import org.apache.jcs.utils.threadpool.behavior.IPoolConfiguration;
  +
   /*
    * Copyright 2001-2004 The Apache Software Foundation.
    *
  @@ -18,27 +20,51 @@
   
   /**
    * This object holds configuration data for a thread pool.
  - *
  + * 
    * @author Aaron Smuts
  - *
  + *  
    */
  -public class PoolConfiguration implements Cloneable
  +public class PoolConfiguration implements Cloneable, IPoolConfiguration
   {
  -  private int     boundarySize     = 75;
   
  -  private int     maximumPoolSize  = 150;
  +  private boolean useBoundary       = true;
  +
  +  private int     boundarySize      = 2000;
   
  -  private int     minimumPoolSize  = 4;
  +  // only has meaning if a bounday is used
  +  private int     maximumPoolSize   = 150;
   
  -  private int     keepAliveTime    = 1000 * 60 * 5;
  +  // the exact number that will be used in a boundless queue. If the queue has
  +  // a boundary, more will be created if the queue fills.
  +  private int     minimumPoolSize   = 4;
   
  -  private boolean abortWhenBlocked = false;
  +  private int     keepAliveTime     = 1000 * 60 * 5;
   
  -  private int     startUpSize      = 4;
  +  //should be ABORT, BLOCK, RUN, WAIT, DISCARDOLDEST,
  +  private String  whenBlockedPolicy = POLICY_RUN;
  +
  +  private int     startUpSize       = 4;
  +
  +  /**
  +   * @param useBoundary
  +   *          The useBoundary to set.
  +   */
  +  public void setUseBoundary( boolean useBoundary )
  +  {
  +    this.useBoundary = useBoundary;
  +  }
  +
  +  /**
  +   * @return Returns the useBoundary.
  +   */
  +  public boolean isUseBoundary()
  +  {
  +    return useBoundary;
  +  }
   
     /**
      * Default
  -   *
  +   *  
      */
     public PoolConfiguration()
     {
  @@ -46,7 +72,7 @@
     }
   
     /**
  -   *
  +   * 
      * @param boundarySize
      * @param maximumPoolSize
      * @param minimumPoolSize
  @@ -54,9 +80,9 @@
      * @param abortWhenlocked
      * @param startUpSize
      */
  -  public PoolConfiguration(int boundarySize, int maximumPoolSize,
  -      int minimumPoolSize, int keepAliveTime, boolean abortWhenBlocked,
  -      int startUpSize)
  +  public PoolConfiguration(boolean useBoundary, int boundarySize,
  +      int maximumPoolSize, int minimumPoolSize, int keepAliveTime,
  +      String henBlockedPolicy, int startUpSize)
     {
     }
   
  @@ -129,20 +155,53 @@
     }
   
     /**
  -   * @param abortWhenBlocked
  -   *          The abortWhenBlocked to set.
  +   * @param whenBlockedPolicy
  +   *          The whenBlockedPolicy to set.
      */
  -  public void setAbortWhenBlocked( boolean abortWhenBlocked )
  +  public void setWhenBlockedPolicy( String whenBlockedPolicy )
     {
  -    this.abortWhenBlocked = abortWhenBlocked;
  +    if ( whenBlockedPolicy != null )
  +    {
  +      if (whenBlockedPolicy.equalsIgnoreCase( POLICY_ABORT ))
  +      {
  +        this.whenBlockedPolicy = POLICY_ABORT;
  +      }
  +      else if (whenBlockedPolicy.equalsIgnoreCase( POLICY_RUN ))
  +      {
  +        this.whenBlockedPolicy = POLICY_RUN;
  +      }
  +      else if (whenBlockedPolicy.equalsIgnoreCase( POLICY_BLOCK ))
  +      {
  +        this.whenBlockedPolicy = POLICY_BLOCK;
  +      }
  +      else if (whenBlockedPolicy.equalsIgnoreCase( POLICY_DISCARDOLDEST ))
  +      {
  +        this.whenBlockedPolicy = POLICY_DISCARDOLDEST;
  +      }
  +      else if (whenBlockedPolicy.equalsIgnoreCase( POLICY_WAIT ) )
  +      {
  +        this.whenBlockedPolicy = POLICY_WAIT;
  +      }
  +      else 
  +      {
  +        // the value is invalid, dfault to RUN
  +        this.whenBlockedPolicy = POLICY_RUN;
  +      }      
  +    }
  +    else 
  +    {
  +      // the value is null, dfault to RUN
  +      this.whenBlockedPolicy = POLICY_RUN;      
  +    }
  +    
     }
   
     /**
  -   * @return Returns the abortWhenBlocked.
  +   * @return Returns the whenBlockedPolicy.
      */
  -  public boolean isAbortWhenBlocked()
  +  public String getWhenBlockedPolicy()
     {
  -    return abortWhenBlocked;
  +    return whenBlockedPolicy;
     }
   
     /**
  @@ -168,11 +227,12 @@
     public String toString()
     {
       StringBuffer buf = new StringBuffer();
  +    buf.append( "useBoundary = [" + isUseBoundary() + "]" );
       buf.append( "boundarySize = [" + boundarySize + "]" );
       buf.append( "maximumPoolSize = [" + maximumPoolSize + "]" );
       buf.append( "minimumPoolSize = [" + minimumPoolSize + "]" );
       buf.append( "keepAliveTime = [" + keepAliveTime + "]" );
  -    buf.append( "abortWhenBlocked = [" + abortWhenBlocked + "]" );
  +    buf.append( "whenBlockedPolicy = [" + getWhenBlockedPolicy() + "]" );
       buf.append( "startUpSize = [" + startUpSize + "]" );
       return buf.toString();
     }
  @@ -182,7 +242,8 @@
      */
     public Object clone()
     {
  -    return new PoolConfiguration( boundarySize, maximumPoolSize,
  -        minimumPoolSize, keepAliveTime, abortWhenBlocked, startUpSize );
  +    return new PoolConfiguration( isUseBoundary(), boundarySize,
  +        maximumPoolSize, minimumPoolSize, keepAliveTime,
  +        getWhenBlockedPolicy(), startUpSize );
     }
  -}
  +}
  \ No newline at end of file
  
  
  
  1.1                  jakarta-turbine-jcs/src/java/org/apache/jcs/engine/PooledCacheEventQueue.java
  
  Index: PooledCacheEventQueue.java
  ===================================================================
  package org.apache.jcs.engine;
  
  /*
   * Copyright 2001-2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License")
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  import java.io.IOException;
  import java.io.Serializable;
  import java.util.ArrayList;
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.jcs.engine.behavior.ICacheElement;
  import org.apache.jcs.engine.behavior.ICacheEventQueue;
  import org.apache.jcs.engine.behavior.ICacheListener;
  import org.apache.jcs.engine.stats.StatElement;
  import org.apache.jcs.engine.stats.Stats;
  import org.apache.jcs.engine.stats.behavior.IStatElement;
  import org.apache.jcs.engine.stats.behavior.IStats;
  import org.apache.jcs.utils.threadpool.ThreadPool;
  import org.apache.jcs.utils.threadpool.ThreadPoolManager;
  
  import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
  
  /**
   * An event queue is used to propagate ordered cache events to one and only one
   * target listener.
   * 
   * This is a modified version of the experimental version. It uses a
   * PooledExecutor and a BoundedBuffer to queue up events and execute them as
   * threads become available.
   * 
   * The PooledExecutor is static, because presumably these processes will be IO
   * bound, so throwing more than a few threads at them will serve no purpose
   * other than to saturate the IO interface. In light of this, having one thread
   * per region seems unnecessary. This may prove to be false.
   * 
   * @author Aaron Smuts
   * @author Travis Savo <ts...@ifilm.com>
   *  
   */
  public class PooledCacheEventQueue implements ICacheEventQueue
  {
    
    private static final int queueType = POOLED_QUEUE_TYPE;
    
    private static final Log log             = LogFactory
                                                 .getLog( PooledCacheEventQueue.class );
  
    // time to wait for an event before snuffing the background thread
    // if the queue is empty.
    // make configurable later
    private static int       waitToDieMillis = 10000;
  
    private ICacheListener   listener;
  
    private long             listenerId;
  
    private String           cacheName;
  
    private int              failureCount;
  
    private int              maxFailure;
  
    // in milliseconds
    private int              waitBeforeRetry;
  
    private boolean          destroyed       = true;
  
    private boolean          working         = true;
  
    private Thread           processorThread;
  
    //The Thread Pool to execute events with.
    private ThreadPool       pool            = null;
  
    /**
     * Constructor for the CacheEventQueue object
     * 
     * @param listener
     * @param listenerId
     * @param cacheName
     * @param maxFailure
     * @param waitBeforeRetry
     */
    public PooledCacheEventQueue(ICacheListener listener, long listenerId,
        String cacheName, int maxFailure, int waitBeforeRetry,
        String threadPoolName)
    {
      if (listener == null)
      {
        throw new IllegalArgumentException( "listener must not be null" );
      }
  
      this.listener = listener;
      this.listenerId = listenerId;
      this.cacheName = cacheName;
      this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
      this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
  
      // this will share the same pool with other event queues by default.
      if (threadPoolName == null)
      {
        threadPoolName = "cache_event_queue";
      }
      pool = ThreadPoolManager.getInstance().getPool( threadPoolName );
  
      //When our pool is filling up too fast, we should ditch the oldest
      // event in favor of the newer ones.
      //TODO: Make this configurable as a generic option.
      //pool.getPool().discardOldestWhenBlocked();
  
      if (log.isDebugEnabled())
      {
        log.debug( "Constructed: " + this );
      }
    }
  
    /*
     *  (non-Javadoc)
     * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getQueueType()
     */
    public int getQueueType()
    {
      return queueType;
    }
    
    
    /**
     * Event Q is emtpy.
     */
    public synchronized void stopProcessing()
    {
  
      destroyed = true;
      processorThread = null;
  
    }
  
    /**
     * Returns the time to wait for events before killing the background thread.
     */
    public int getWaitToDieMillis()
    {
      return waitToDieMillis;
    }
  
    /**
     * Sets the time to wait for events before killing the background thread.
     */
    public void setWaitToDieMillis( int wtdm )
    {
      waitToDieMillis = wtdm;
    }
  
    /**
     * @return
     */
    public String toString()
    {
      return "CacheEventQueue [listenerId=" + listenerId + ", cacheName="
          + cacheName + "]";
    }
  
    /**
     * @return The {3} value
     */
    public boolean isAlive()
    {
      return (!destroyed);
    }
  
    public void setAlive( boolean aState )
    {
      destroyed = !aState;
    }
  
    /**
     * @return The {3} value
     */
    public long getListenerId()
    {
      return listenerId;
    }
  
    /**
     * Event Q is emtpy.
     */
    public synchronized void destroy()
    {
      if (!destroyed)
      {
        destroyed = true;
        log.info( "Cache event queue destroyed: " + this );
      }
    }
  
    /**
     * @param ce
     *          The feature to be added to the PutEvent attribute
     * @exception IOException
     */
    public synchronized void addPutEvent( ICacheElement ce ) throws IOException
    {
      if (isWorking())
      {
        put( new PutEvent( ce ) );
      }
      else
      {
        if (log.isWarnEnabled())
        {
          log.warn( "Not enqueuing Put Event for [" + this
              + "] because it's non-functional." );
        }
      }
    }
  
    /**
     * @param key
     *          The feature to be added to the RemoveEvent attribute
     * @exception IOException
     */
    public synchronized void addRemoveEvent( Serializable key )
        throws IOException
    {
      if (isWorking())
      {
        put( new RemoveEvent( key ) );
      }
      else
      {
        if (log.isWarnEnabled())
        {
          log.warn( "Not enqueuing Remove Event for [" + this
              + "] because it's non-functional." );
        }
      }
    }
  
    /**
     * @exception IOException
     */
    public synchronized void addRemoveAllEvent() throws IOException
    {
      if (isWorking())
      {
        put( new RemoveAllEvent() );
      }
      else
      {
        if (log.isWarnEnabled())
        {
          log.warn( "Not enqueuing RemoveAll Event for [" + this
              + "] because it's non-functional." );
        }
      }
    }
  
    /**
     * @exception IOException
     */
    public synchronized void addDisposeEvent() throws IOException
    {
      if (isWorking())
      {
        put( new DisposeEvent() );
      }
      else
      {
        if (log.isWarnEnabled())
        {
          log.warn( "Not enqueuing Dispose Event for [" + this
              + "] because it's non-functional." );
        }
      }
    }
  
    /**
     * Adds an event to the queue.
     * 
     * @param event
     */
    private void put( AbstractCacheEvent event )
    {
      try
      {
        pool.execute( event );
      }
      catch (InterruptedException e)
      {
        log.error( e );
      }
    }
  
    public String getStats()
    {
      return getStatistics().toString();
    }
  
    /*
     * (non-Javadoc)
     * 
     * @see org.apache.jcs.engine.behavior.ICacheEventQueue#getStatistics()
     */
    public IStats getStatistics()
    {
      IStats stats = new Stats();
      stats.setTypeName( "Pooled Cache Event Queue" );
  
      ArrayList elems = new ArrayList();
  
      IStatElement se = null;
  
      se = new StatElement();
      se.setName( "Working" );
      se.setData( "" + this.working );
      elems.add( se );
  
      se.setName( "Destroyed" );
      se = new StatElement();
      se.setData( "" + this.isAlive() );
      elems.add( se );
  
      se.setName( "Empty" );
      se = new StatElement();
      se.setData( "" + this.isEmpty() );
      elems.add( se );
  
      if (pool.getQueue() != null)
      {
        if ( pool.getQueue() instanceof BoundedBuffer )
        {
          BoundedBuffer bb = (BoundedBuffer)pool.getQueue();
          se.setName( "Queue Size" );
          se = new StatElement();
          se.setData( "" + bb.size() );
          elems.add( se );
  
          se.setName( "Queue Capacity" );
          se = new StatElement();
          se.setData( "" + bb.capacity() );
          elems.add( se );        
        }
      }
  
      se.setName( "Pool Size" );
      se = new StatElement();
      se.setData( "" + pool.getPool().getPoolSize() );
      elems.add( se );
  
      se.setName( "Maximum Pool Size" );
      se = new StatElement();
      se.setData( "" + pool.getPool().getMaximumPoolSize() );
      elems.add( se );
  
      // get an array and put them in the Stats object
      IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[0] );
      stats.setStatElements( ses );
  
      return stats;
    }
  
    ///////////////////////////// Inner classes /////////////////////////////
  
    /**
     * Retries before declaring failure.
     * 
     * @author asmuts
     * @created January 15, 2002
     */
    private abstract class AbstractCacheEvent implements Runnable
    {
      int     failures = 0;
  
      boolean done     = false;
  
      /**
       * Main processing method for the AbstractCacheEvent object
       */
      public void run()
      {
        try
        {
          doRun();
        }
        catch (IOException e)
        {
          if (log.isWarnEnabled())
          {
            log.warn( e );
          }
          if (++failures >= maxFailure)
          {
            if (log.isWarnEnabled())
            {
              log
                  .warn( "Error while running event from Queue: "
                      + this
                      + ". Dropping Event and marking Event Queue as non-functional." );
            }
            setWorking( false );
            setAlive( false );
            return;
          }
          if (log.isInfoEnabled())
          {
            log.info( "Error while running event from Queue: " + this
                + ". Retrying..." );
          }
          try
          {
            Thread.sleep( waitBeforeRetry );
            run();
          }
          catch (InterruptedException ie)
          {
            if (log.isErrorEnabled())
            {
              log.warn( "Interrupted while sleeping for retry on event " + this
                  + "." );
            }
            setWorking( false );
            setAlive( false );
          }
        }
      }
  
      /**
       * @exception IOException
       */
      protected abstract void doRun() throws IOException;
    }
  
    /**
     * @author asmuts
     * @created January 15, 2002
     */
    private class PutEvent extends AbstractCacheEvent
    {
  
      private ICacheElement ice;
  
      /**
       * Constructor for the PutEvent object
       * 
       * @param ice
       * @exception IOException
       */
      PutEvent(ICacheElement ice) throws IOException
      {
        this.ice = ice;
        /*
         * this.key = key; this.obj = CacheUtils.dup(obj); this.attr = attr;
         * this.groupName = groupName;
         */
      }
  
      /**
       * Description of the Method
       * 
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        /*
         * CacheElement ce = new CacheElement(cacheName, key, obj);
         * ce.setElementAttributes( attr ); ce.setGroupName( groupName );
         */
        listener.handlePut( ice );
      }
  
      public String toString()
      {
        return new StringBuffer( "PutEvent for key: " ).append( ice.getKey() )
            .append( " value: " ).append( ice.getVal() ).toString();
      }
  
    }
  
    /**
     * Description of the Class
     * 
     * @author asmuts
     * @created January 15, 2002
     */
    private class RemoveEvent extends AbstractCacheEvent
    {
      private Serializable key;
  
      /**
       * Constructor for the RemoveEvent object
       * 
       * @param key
       * @exception IOException
       */
      RemoveEvent(Serializable key) throws IOException
      {
        this.key = key;
      }
  
      /**
       * Description of the Method
       * 
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleRemove( cacheName, key );
      }
  
      /*
       * (non-Javadoc)
       * 
       * @see java.lang.Object#toString()
       */
      public String toString()
      {
        return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
      }
  
    }
  
    /**
     * Description of the Class
     * 
     * @author asmuts
     * @created January 15, 2002
     */
    private class RemoveAllEvent extends AbstractCacheEvent
    {
  
      /**
       * Description of the Method
       * 
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleRemoveAll( cacheName );
      }
  
      /*
       * (non-Javadoc)
       * 
       * @see java.lang.Object#toString()
       */
      public String toString()
      {
        return "RemoveAllEvent";
      }
  
    }
  
    /**
     * Description of the Class
     * 
     * @author asmuts
     * @created January 15, 2002
     */
    private class DisposeEvent extends AbstractCacheEvent
    {
  
      /**
       * Called when gets to the end of the queue
       * 
       * @exception IOException
       */
      protected void doRun() throws IOException
      {
        listener.handleDispose( cacheName );
      }
  
      public String toString()
      {
        return "DisposeEvent";
      }
    }
  
    /**
     * @return
     */
    public boolean isWorking()
    {
      return working;
    }
  
    /**
     * @param b
     */
    public void setWorking( boolean b )
    {
      working = b;
    }
  
    /**
     * If the Queue is using a bounded channel we can determine the size.
     * If it is zero or we can't determine the size, we return true.
     * 
     * @return
     */
    public boolean isEmpty()
    {
      if ( pool.getQueue() == null )
      {
        return true;      
      }
      else 
      {
          if ( pool.getQueue() instanceof BoundedBuffer )
          {
            BoundedBuffer bb = (BoundedBuffer)pool.getQueue();
            return bb.size() == 0;
          }
          else
          {
            return true;                
          }
      }
    }
  
  }
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org