You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2004/05/25 07:38:07 UTC

cvs commit: jakarta-turbine-jcs/src/java/org/apache/jcs/engine CacheEventQueue.java

asmuts      2004/05/24 22:38:07

  Modified:    src/java/org/apache/jcs/engine CacheEventQueue.java
  Log:
  Updated to lazy event queue thread creation.
  
  Based on Travis Savo's patch.
  
  Revision  Changes    Path
  1.9       +531 -368  jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEventQueue.java
  
  Index: CacheEventQueue.java
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEventQueue.java,v
  retrieving revision 1.8
  retrieving revision 1.9
  diff -u -r1.8 -r1.9
  --- CacheEventQueue.java	25 May 2004 05:27:43 -0000	1.8
  +++ CacheEventQueue.java	25 May 2004 05:38:06 -0000	1.9
  @@ -1,6 +1,5 @@
   package org.apache.jcs.engine;
   
  -
   /*
    * Copyright 2001-2004 The Apache Software Foundation.
    *
  @@ -17,7 +16,6 @@
    * limitations under the License.
    */
   
  -
   import java.io.IOException;
   import java.io.Serializable;
   
  @@ -31,482 +29,647 @@
    * An event queue is used to propagate ordered cache events to one and only one
    * target listener.
    *
  - * <pre>
  - * Changes:<br>
  - * 17 April 2004  Hanson Char
  - * <ol><li>Bug fix: add missing synchronization to method addRemoveEvent();</li>
  - * <li>Use the light weight new int[0] for creating the object monitor queueLock,
  - * instead of new Object();</li>
  - * <li>Explicitely qualify member variables of CacheEventQueue in inner classes.
  - * Hopefully this will help identify any potential concurrency issue.</li>
  - * </ol>
  - * </pre>
  + * This is a modified version of the experimental version.
  + * It should lazy initilaize the processor thread, and kill the thread if
  + * the queue goes emtpy for a specified period, now set to 1 minute.  If
  + * something comes in after that a new processor thread should be created.
  + *
  + * I didn't get all of Hanson's cahnges in yet, but I did add the syncronization.
    */
  -public class CacheEventQueue implements ICacheEventQueue
  +public class CacheEventQueue
  +    implements ICacheEventQueue
   {
  -    private final static Log log = LogFactory.getLog( CacheEventQueue.class );
  +  private static final Log log = LogFactory.getLog( CacheEventQueue.class );
   
  -    private static int processorInstanceCount = 0;
  +  // private LinkedQueue queue = new LinkedQueue();
   
  -    // private LinkedQueue queue = new LinkedQueue();
  +  // time to wait for an event before snuffing the background thread
  +  // if the queue is empty.
  +  // make configurable later
  +  private int waitToDieMillis = 60000;
   
  -    private ICacheListener listener;
  -    private long listenerId;
  -    private String cacheName;
  +  private ICacheListener listener;
  +  private long listenerId;
  +  private String cacheName;
   
  -    private int failureCount;
  -    private int maxFailure;
  +  private int failureCount;
  +  private int maxFailure;
   
  -    // in milliseconds
  -    private int waitBeforeRetry;
  +  // in milliseconds
  +  private int waitBeforeRetry;
   
  -    private boolean destroyed;
  -    private Thread t;
  +  private boolean destroyed = true;
  +  private boolean working = true;
  +  private Thread processorThread;
   
  -    // Internal queue implementation
  +  // Internal queue implementation
   
  -    private Object queueLock = new int[0];
  +  private Object queueLock = new Object();
   
  -    // Dummy node
  +  // Dummy node
   
  -    private Node head = new Node();
  -    private Node tail = head;
  +  private Node head = new Node();
  +  private Node tail = head;
   
  -    /**
  -     * Constructs with the specified listener and the cache name.
  -     *
  -     * @param listener
  -     * @param listenerId
  -     * @param cacheName
  -     */
  -    public CacheEventQueue( ICacheListener listener,
  -                            long listenerId,
  -                            String cacheName )
  +  /**
  +   * Constructs with the specified listener and the cache name.
  +   *
  +   * @param listener
  +   * @param listenerId
  +   * @param cacheName
  +   */
  +  public CacheEventQueue( ICacheListener listener, long listenerId,
  +                          String cacheName )
  +  {
  +    this( listener, listenerId, cacheName, 10, 500 );
  +  }
  +
  +  /**
  +   * Constructor for the CacheEventQueue object
  +   *
  +   * @param listener
  +   * @param listenerId
  +   * @param cacheName
  +   * @param maxFailure
  +   * @param waitBeforeRetry
  +   */
  +  public CacheEventQueue(
  +      ICacheListener listener,
  +      long listenerId,
  +      String cacheName,
  +      int maxFailure,
  +      int waitBeforeRetry )
  +  {
  +    if ( listener == null )
       {
  -        this( listener, listenerId, cacheName, 10, 500 );
  +      throw new IllegalArgumentException( "listener must not be null" );
       }
   
  -    /**
  -     * Constructor for the CacheEventQueue object
  -     *
  -     * @param listener
  -     * @param listenerId
  -     * @param cacheName
  -     * @param maxFailure
  -     * @param waitBeforeRetry
  -     */
  -    public CacheEventQueue( ICacheListener listener,
  -                            long listenerId,
  -                            String cacheName,
  -                            int maxFailure,
  -                            int waitBeforeRetry )
  +    this.listener = listener;
  +    this.listenerId = listenerId;
  +    this.cacheName = cacheName;
  +    this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
  +    this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
  +
  +    if ( log.isDebugEnabled() )
       {
  -        if ( listener == null )
  -        {
  -            throw new IllegalArgumentException( "listener must not be null" );
  -        }
  +      log.debug( "Constructed: " + this );
  +    }
  +  }
   
  -        this.listener = listener;
  -        this.listenerId = listenerId;
  -        this.cacheName = cacheName;
  -        this.maxFailure = maxFailure <= 0 ? 10 : maxFailure;
  -        this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
  +  /**
  +   * Event Q is emtpy.
  +   */
  +  public synchronized void stopProcessing()
  +  {
   
  -        this.t = new QProcessor();
  -        this.t.start();
  +    destroyed = true;
  +    processorThread = null;
   
  -        if ( log.isDebugEnabled() )
  -        {
  -            log.debug( "Constructed: " + this );
  -        }
  -    }
  +  }
   
  -    /**
  -     * Event Q is emtpy.
  -     */
  -    public synchronized void destroy()
  -    {
  -        if ( !this.destroyed )
  -        {
  -            this.destroyed = true;
  +  /**
  +   * Returns the time to wait for events before killing the background thread.
  +   */
  +  public int getWaitToDieMillis()
  +  {
  +    return waitToDieMillis;
  +  }
   
  -            // sychronize on queue so the thread will not wait forever,
  -            // and then interrupt the QueueProcessor
  +  /**
  +   * Sets the time to wait for events before killing the background thread.
  +   */
  +  public void setWaitToDieMillis( int wtdm)
  +  {
  +    waitToDieMillis = wtdm;
  +  }
   
  -            synchronized ( this.queueLock )
  -            {
  -                this.t.interrupt();
  -            }
  +  /**
  +   * @return
  +   */
  +  public String toString()
  +  {
  +    return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" +
  +        cacheName + "]";
  +  }
  +
  +  /**
  +   * @return The {3} value
  +   */
  +  public boolean isAlive()
  +  {
  +    return ( !destroyed );
  +  }
  +
  +  public void setAlive( boolean aState )
  +  {
  +    destroyed = !aState;
  +  }
  +
  +  /**
  +   * @return The {3} value
  +   */
  +  public long getListenerId()
  +  {
  +    return listenerId;
  +  }
  +
  +  /**
  +   * Event Q is emtpy.
  +   */
  +  public synchronized void destroy()
  +  {
  +    if ( !destroyed )
  +    {
  +      destroyed = true;
   
  -            this.t = null;
  +      // sychronize on queue so the thread will not wait forever,
  +      // and then interrupt the QueueProcessor
   
  -            log.info( "Cache event queue destroyed: " + this );
  +      if ( processorThread != null )
  +      {
  +        synchronized ( queueLock )
  +        {
  +          processorThread.interrupt();
           }
  +      }
  +      processorThread = null;
  +
  +      log.info( "Cache event queue destroyed: " + this );
       }
  +  }
   
  -    /**
  -     * @return
  -     */
  -    public String toString()
  +  /**
  +   * @param ce
  +   *          The feature to be added to the PutEvent attribute
  +   * @exception IOException
  +   */
  +  public synchronized void addPutEvent( ICacheElement ce ) throws IOException
  +  {
  +    if ( isWorking() )
       {
  -        return "listenerId=" + this.listenerId + ", cacheName=" + this.cacheName;
  +      put( new PutEvent( ce ) );
       }
  -
  -    /**
  -     * @return The {3} value
  -     */
  -    public boolean isAlive()
  +    else
       {
  -        return ( !this.destroyed );
  +      if ( log.isWarnEnabled() )
  +      {
  +        log.warn( "Not enqueuing Put Event for [" +
  +                     this +"] because it's non-functional." );
  +      }
       }
  +  }
   
  -    /**
  -     * @return The {3} value
  -     */
  -    public long getListenerId()
  +  /**
  +   * @param key
  +   *          The feature to be added to the RemoveEvent attribute
  +   * @exception IOException
  +   */
  +  public synchronized void addRemoveEvent( Serializable key ) throws IOException
  +  {
  +    if ( isWorking() )
       {
  -        return this.listenerId;
  +      put( new RemoveEvent( key ) );
       }
  -
  -    /**
  -     * @param ce The feature to be added to the PutEvent attribute
  -     * @exception IOException
  -     */
  -    public synchronized void addPutEvent( ICacheElement ce )
  -        throws IOException
  +    else
       {
  -        if ( !this.destroyed )
  -        {
  -            put( new PutEvent( ce ) );
  -        }
  +      if ( log.isWarnEnabled() )
  +      {
  +        log.warn( "Not enqueuing Remove Event for [" +
  +                     this +"] because it's non-functional." );
  +      }
       }
  +  }
   
  -    /**
  -     * @param key The feature to be added to the RemoveEvent attribute
  -     * @exception IOException
  -     */
  -    public synchronized void addRemoveEvent( Serializable key )
  -        throws IOException
  +  /**
  +   * @exception IOException
  +   */
  +  public synchronized void addRemoveAllEvent() throws IOException
  +  {
  +    if ( isWorking() )
       {
  -        if ( !this.destroyed )
  -        {
  -            put( new RemoveEvent( key ) );
  -        }
  +      put( new RemoveAllEvent() );
       }
  -
  -    /**
  -     * @exception IOException
  -     */
  -    public synchronized void addRemoveAllEvent()
  -        throws IOException
  +    else
       {
  -        if ( !this.destroyed )
  -        {
  -            put( new RemoveAllEvent() );
  -        }
  +      if ( log.isWarnEnabled() )
  +      {
  +        log.warn( "Not enqueuing RemoveAll Event for [" +
  +                     this +"] because it's non-functional." );
  +      }
       }
  +  }
   
  -    /**
  -     * @exception IOException
  -     */
  -    public synchronized void addDisposeEvent()
  -        throws IOException
  +  /**
  +   * @exception IOException
  +   */
  +  public synchronized void addDisposeEvent() throws IOException
  +  {
  +    if ( isWorking() )
       {
  -        if ( !this.destroyed )
  -        {
  -            put( new DisposeEvent() );
  -        }
  +      put( new DisposeEvent() );
  +    }
  +    else
  +    {
  +      if ( log.isWarnEnabled() )
  +      {
  +        log.warn( "Not enqueuing Dispose Event for [" +
  +                     this +"] because it's non-functional." );
  +      }
       }
  +  }
   
  -    /**
  -     * Adds an event to the queue.
  -     *
  -     * @param event
  -     */
  -    private void put( AbstractCacheEvent event )
  +  /**
  +   * Adds an event to the queue.
  +   *
  +   * @param event
  +   */
  +  private void put( AbstractCacheEvent event )
  +  {
  +    Node newNode = new Node();
  +    if ( log.isDebugEnabled() )
       {
  -        Node newNode = new Node();
  +      log.debug( "Event entering Queue for " + cacheName + ": " + event );
  +    }
   
  -        newNode.event = event;
  +    newNode.event = event;
   
  -        synchronized ( this.queueLock )
  +    synchronized ( queueLock )
  +    {
  +      tail.next = newNode;
  +      tail = newNode;
  +      if ( isWorking() )
  +      {
  +        if ( !isAlive() )
           {
  -            this.tail.next = newNode;
  -            this.tail = newNode;
  -
  -            this.queueLock.notify();
  +          destroyed = false;
  +          processorThread = new QProcessor( this );
  +          processorThread.start();
  +        }
  +        else
  +        {
  +          queueLock.notify();
           }
  +      }
       }
  +  }
   
  -    private AbstractCacheEvent take() throws InterruptedException
  +  /**
  +   * Returns the next cache event from the queue or null if there are no events
  +   * in the queue.
  +   *
  +   */
  +  private AbstractCacheEvent take()
  +  {
  +    synchronized ( queueLock )
       {
  -        synchronized ( this.queueLock )
  -        {
  -            // wait until there is something to read
  +      // wait until there is something to read
  +      if ( head == tail )
  +      {
  +        return null;
  +      }
   
  -            while ( this.head == this.tail )
  -            {
  -                this.queueLock.wait();
  -            }
  +      Node node = head.next;
   
  -            // we have the lock, and the list is not empty
  +      AbstractCacheEvent value = node.event;
   
  -            Node node = this.head.next;
  +      if ( log.isDebugEnabled() )
  +      {
  +        log.debug( "head.event = " + head.event );
  +        log.debug( "node.event = " + node.event );
  +      }
   
  -            // This is an awful bug.  This will always return null.
  -            // This make the event Q and event destroyer.
  -            //AbstractCacheEvent value = head.event;
  +      // Node becomes the new head (head is always empty)
   
  -            // corrected
  -            AbstractCacheEvent value = node.event;
  +      node.event = null;
  +      head = node;
   
  -            if ( log.isDebugEnabled() )
  -            {
  -              log.debug( "head.event = " + this.head.event );
  -              log.debug( "node.event = " + node.event );
  -            }
  +      return value;
  +    }
  +  }
   
  -            // Node becomes the new head (head is always empty)
  +  ///////////////////////////// Inner classes /////////////////////////////
   
  -            node.event = null;
  -            this.head = node;
  +  private static class Node
  +  {
  +    Node next = null;
  +    AbstractCacheEvent event = null;
  +  }
   
  -            return value;
  -        }
  -    }
  +  /**
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private class QProcessor
  +      extends Thread
  +  {
  +    CacheEventQueue queue;
  +    /**
  +     * Constructor for the QProcessor object
  +     */
  +    QProcessor( CacheEventQueue aQueue )
  +    {
   
  -    ///////////////////////////// Inner classes /////////////////////////////
  +      super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
   
  -    private static class Node
  -    {
  -        Node next = null;
  -        AbstractCacheEvent event = null;
  +      setDaemon( true );
  +      queue = aQueue;
       }
   
       /**
  +     * Main processing method for the QProcessor object.
  +     *
  +     * Waits for a specified time (waitToDieMillis) for something to come in
  +     * and if no new events come in during that period the run method can exit
  +     * and the thread is dereferenced.
        */
  -    private class QProcessor extends Thread
  +    public void run()
       {
  -        /**
  -         * Constructor for the QProcessor object
  -         */
  -        QProcessor()
  -        {
  -            super( "CacheEventQueue.QProcessor-" + ( ++CacheEventQueue.this.processorInstanceCount ) );
  +      AbstractCacheEvent r = null;
   
  -            setDaemon( true );
  +      while ( queue.isAlive() )
  +      {
  +        r = queue.take();
  +
  +        if ( log.isDebugEnabled() )
  +        {
  +          log.debug( "Event from queue = " + r );
           }
   
  -        /**
  -         * Main processing method for the QProcessor object
  -         */
  -        public void run()
  +        if ( r == null )
           {
  -            AbstractCacheEvent r = null;
  +          synchronized ( queueLock )
  +          {
  +            try
  +            {
  +              queueLock.wait( queue.getWaitToDieMillis() );
  +            }
  +            catch ( InterruptedException e )
  +            {
  +              log.warn(
  +                  "Interrupted while waiting for another event to come in before we die." );
  +              return;
  +            }
  +            r = queue.take();
  +            if ( log.isDebugEnabled() )
  +            {
  +              log.debug( "Event from queue after sleep = " + r );
  +            }
  +            if ( r == null )
  +            {
  +              queue.stopProcessing();
  +            }
  +          }
  +        }
   
  -            while ( !CacheEventQueue.this.destroyed )
  +        if ( queue.isWorking() && queue.isAlive() && r != null )
  +        {
  +          r.run();
  +        }
  +      }
  +      if ( log.isInfoEnabled() )
  +      {
  +        log.info( "QProcessor exiting for " + queue );
  +      }
  +    }
  +  }
  +
  +  /**
  +   * Retries before declaring failure.
  +   *
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private abstract class AbstractCacheEvent
  +      implements Runnable
  +  {
  +    int failures = 0;
  +    boolean done = false;
  +
  +    /**
  +     * Main processing method for the AbstractCacheEvent object
  +     */
  +    public void run()
  +    {
  +      try
  +      {
  +        doRun();
  +      }
  +      catch ( IOException e )
  +      {
  +        if ( log.isWarnEnabled() )
  +        {
  +          log.warn( e );
  +        }
  +        if ( ++failures >= maxFailure )
  +        {
  +          if ( log.isWarnEnabled() )
  +          {
  +            log.warn(
  +                "Error while running event from Queue: "
  +                + this
  +                +". Dropping Event and marking Event Queue as non-functional." );
  +          }
  +          setWorking( false );
  +          setAlive( false );
  +          return;
  +        }
  +        else
  +        {
  +          if ( log.isInfoEnabled() )
  +          {
  +            log.info( "Error while running event from Queue: " +
  +                         this +". Retrying..." );
  +          }
  +          try
  +          {
  +            Thread.sleep( waitBeforeRetry );
  +            run();
  +          }
  +          catch ( InterruptedException ie )
  +          {
  +            if ( log.isErrorEnabled() )
               {
  -                try
  -                {
  -                    r = take();
  -
  -                    if ( log.isDebugEnabled() )
  -                    {
  -                        log.debug( "r from take() = " + r );
  -                    }
  -
  -                }
  -                catch ( InterruptedException e )
  -                {
  -                    // We were interrupted, just continue -- the while loop
  -                    // will exit if we have been properly destroyed.
  -                }
  -
  -                if ( !CacheEventQueue.this.destroyed && r != null )
  -                {
  -                    r.run();
  -                }
  +              log.warn( "Interrupted while sleeping for retry on event " +
  +                           this +"." );
               }
  -            // declare failure as listener is permanently unreachable.
  -            // queue = null;
  -            CacheEventQueue.this.listener = null;
  -            // The listener failure logging more the problem of the user
  -            // of the q.
  -            log.info( "QProcessor exiting for " + CacheEventQueue.this );
  +            setWorking( false );
  +            setAlive( false );
  +          }
           }
  +      }
       }
   
       /**
  -     * Retries before declaring failure.
  -     *
  +     * @exception IOException
        */
  -    private abstract class AbstractCacheEvent implements Runnable
  -    {
  -        /**
  -         * Main processing method for the AbstractCacheEvent object
  -         */
  -        public void run()
  -        {
  -            IOException ex = null;
  +    protected abstract void doRun() throws IOException;
  +  }
   
  -            while ( !CacheEventQueue.this.destroyed
  -                    && CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
  -            {
  -                try
  -                {
  -                    ex = null;
  -                    doRun();
  -                    CacheEventQueue.this.failureCount = 0;
  -                    return;
  -                    // happy and done.
  -                }
  -                catch ( IOException e )
  -                {
  -                    CacheEventQueue.this.failureCount++;
  -                    ex = e;
  -                }
  -                // Let's get idle for a while before retry.
  -                if ( !CacheEventQueue.this.destroyed
  -                     && CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
  -                {
  -                    try
  -                    {
  -                        log.warn( "...retrying propagation " + CacheEventQueue.this + "..." + CacheEventQueue.this.failureCount );
  -                        Thread.currentThread().sleep( CacheEventQueue.this.waitBeforeRetry );
  -                    }
  -                    catch ( InterruptedException ie )
  -                    {
  -                        // ignore;
  -                    }
  -                }
  -            }
  -            // Too bad.  The remote host is unreachable, so we give up.
  -            if ( ex != null )
  -            {
  -                log.warn( "Giving up propagation " + CacheEventQueue.this, ex );
  +  /**
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private class PutEvent
  +      extends AbstractCacheEvent
  +  {
   
  -                destroy();
  -            }
  -            return;
  -        }
  +    private ICacheElement ice;
   
  -        /**
  -         * Description of the Method
  -         *
  -         * @exception IOException
  -         */
  -        protected abstract void doRun()
  -            throws IOException;
  +    /**
  +     * Constructor for the PutEvent object
  +     *
  +     * @param ice
  +     * @exception IOException
  +     */
  +    PutEvent( ICacheElement ice ) throws IOException
  +    {
  +      this.ice = ice;
  +      /*
  +       * this.key = key; this.obj = CacheUtils.dup(obj); this.attr = attr; this.groupName = groupName;
  +       */
       }
   
       /**
  +     * Description of the Method
  +     *
  +     * @exception IOException
        */
  -    private class PutEvent extends AbstractCacheEvent
  +    protected void doRun() throws IOException
       {
  +      /*
  +       * CacheElement ce = new CacheElement(cacheName, key, obj); ce.setElementAttributes( attr ); ce.setGroupName(
  +       * groupName );
  +       */
  +      listener.handlePut( ice );
  +    }
   
  -        private ICacheElement ice;
  -
  -        /**
  -         * Constructor for the PutEvent object
  -         *
  -         * @param ice
  -         * @exception IOException
  -         */
  -        PutEvent( ICacheElement ice )
  -            throws IOException
  -        {
  -            this.ice = ice;
  -            /*
  -             * this.key = key;
  -             * this.obj = CacheUtils.dup(obj);
  -             * this.attr = attr;
  -             * this.groupName = groupName;
  -             */
  -        }
  +    public String toString()
  +    {
  +      return new StringBuffer( "PutEvent for key: " )
  +          .append( ice.getKey() )
  +          .append( " value: " )
  +          .append( ice.getVal() )
  +          .toString();
  +    }
  +
  +  }
  +
  +  /**
  +   * Description of the Class
  +   *
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private class RemoveEvent
  +      extends AbstractCacheEvent
  +  {
  +    private Serializable key;
   
  -        /**
  -         * Description of the Method
  -         *
  -         * @exception IOException
  -         */
  -        protected void doRun()
  -            throws IOException
  -        {
  -            /*
  -             * CacheElement ce = new CacheElement(cacheName, key, obj);
  -             * ce.setElementAttributes( attr );
  -             * ce.setGroupName( groupName );
  -             */
  -            CacheEventQueue.this.listener.handlePut( ice );
  -        }
  +    /**
  +     * Constructor for the RemoveEvent object
  +     *
  +     * @param key
  +     * @exception IOException
  +     */
  +    RemoveEvent( Serializable key ) throws IOException
  +    {
  +      this.key = key;
       }
   
       /**
  -     * Description of the Class
  +     * Description of the Method
        *
  +     * @exception IOException
        */
  -    private class RemoveEvent extends AbstractCacheEvent
  +    protected void doRun() throws IOException
       {
  -        private Serializable key;
  -
  -        /**
  -         * Constructor for the RemoveEvent object
  -         *
  -         * @param key
  -         * @exception IOException
  -         */
  -        RemoveEvent( Serializable key )
  -            throws IOException
  -        {
  -            this.key = key;
  -        }
  +      listener.handleRemove( cacheName, key );
  +    }
   
  -        /**
  -         * Description of the Method
  -         *
  -         * @exception IOException
  -         */
  -        protected void doRun()
  -            throws IOException
  -        {
  -            CacheEventQueue.this.listener.handleRemove( CacheEventQueue.this.cacheName, key );
  -        }
  +    /*
  +     * (non-Javadoc)
  +     *
  +     * @see java.lang.Object#toString()
  +     */
  +    public String toString()
  +    {
  +      return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
       }
   
  +  }
  +
  +  /**
  +   * Description of the Class
  +   *
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private class RemoveAllEvent
  +      extends AbstractCacheEvent
  +  {
  +
       /**
  -     * Description of the Class
  +     * Description of the Method
  +     *
  +     * @exception IOException
  +     */
  +    protected void doRun() throws IOException
  +    {
  +      listener.handleRemoveAll( cacheName );
  +    }
  +
  +    /*
  +     * (non-Javadoc)
        *
  +     * @see java.lang.Object#toString()
        */
  -    private class RemoveAllEvent extends AbstractCacheEvent
  +    public String toString()
       {
  -        /**
  -         * Description of the Method
  -         *
  -         * @exception IOException
  -         */
  -        protected void doRun()
  -            throws IOException
  -        {
  -            CacheEventQueue.this.listener.handleRemoveAll( CacheEventQueue.this.cacheName );
  -        }
  +      return "RemoveAllEvent";
       }
   
  +  }
  +
  +  /**
  +   * Description of the Class
  +   *
  +   * @author asmuts @created January 15, 2002
  +   */
  +  private class DisposeEvent
  +      extends AbstractCacheEvent
  +  {
  +
       /**
  -     * Description of the Class
  +     * Called when gets to the end of the queue
        *
  +     * @exception IOException
        */
  -    private class DisposeEvent extends AbstractCacheEvent
  +    protected void doRun() throws IOException
       {
  -        /**
  -         * Description of the Method
  -         *
  -         * @exception IOException
  -         */
  -        protected void doRun()
  -            throws IOException
  -        {
  -            CacheEventQueue.this.listener.handleDispose( CacheEventQueue.this.cacheName );
  -        }
  +      listener.handleDispose( cacheName );
       }
  -}
   
  +    public String toString()
  +    {
  +      return "DisposeEvent";
  +    }
  +  }
  +
  +  /**
  +   * @return
  +   */
  +  public boolean isWorking()
  +  {
  +    return working;
  +  }
  +
  +  /**
  +   * @param b
  +   */
  +  public void setWorking( boolean b )
  +  {
  +    working = b;
  +  }
  +
  +  public boolean isEmpty()
  +  {
  +    return tail == head;
  +  }
  +
  +}
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org