You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2004/05/25 07:38:07 UTC
cvs commit: jakarta-turbine-jcs/src/java/org/apache/jcs/engine CacheEventQueue.java
asmuts 2004/05/24 22:38:07
Modified: src/java/org/apache/jcs/engine CacheEventQueue.java
Log:
Updated to lazy event queue thread creation.
Based on Travis Savo's patch.
Revision Changes Path
1.9 +531 -368 jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEventQueue.java
Index: CacheEventQueue.java
===================================================================
RCS file: /home/cvs/jakarta-turbine-jcs/src/java/org/apache/jcs/engine/CacheEventQueue.java,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- CacheEventQueue.java 25 May 2004 05:27:43 -0000 1.8
+++ CacheEventQueue.java 25 May 2004 05:38:06 -0000 1.9
@@ -1,6 +1,5 @@
package org.apache.jcs.engine;
-
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
@@ -17,7 +16,6 @@
* limitations under the License.
*/
-
import java.io.IOException;
import java.io.Serializable;
@@ -31,482 +29,647 @@
* An event queue is used to propagate ordered cache events to one and only one
* target listener.
*
- * <pre>
- * Changes:<br>
- * 17 April 2004 Hanson Char
- * <ol><li>Bug fix: add missing synchronization to method addRemoveEvent();</li>
- * <li>Use the light weight new int[0] for creating the object monitor queueLock,
- * instead of new Object();</li>
- * <li>Explicitely qualify member variables of CacheEventQueue in inner classes.
- * Hopefully this will help identify any potential concurrency issue.</li>
- * </ol>
- * </pre>
+ * This is a modified version of the experimental version.
+ * It should lazy initilaize the processor thread, and kill the thread if
+ * the queue goes emtpy for a specified period, now set to 1 minute. If
+ * something comes in after that a new processor thread should be created.
+ *
+ * I didn't get all of Hanson's cahnges in yet, but I did add the syncronization.
*/
-public class CacheEventQueue implements ICacheEventQueue
+public class CacheEventQueue
+ implements ICacheEventQueue
{
- private final static Log log = LogFactory.getLog( CacheEventQueue.class );
+ private static final Log log = LogFactory.getLog( CacheEventQueue.class );
- private static int processorInstanceCount = 0;
+ // private LinkedQueue queue = new LinkedQueue();
- // private LinkedQueue queue = new LinkedQueue();
+ // time to wait for an event before snuffing the background thread
+ // if the queue is empty.
+ // make configurable later
+ private int waitToDieMillis = 60000;
- private ICacheListener listener;
- private long listenerId;
- private String cacheName;
+ private ICacheListener listener;
+ private long listenerId;
+ private String cacheName;
- private int failureCount;
- private int maxFailure;
+ private int failureCount;
+ private int maxFailure;
- // in milliseconds
- private int waitBeforeRetry;
+ // in milliseconds
+ private int waitBeforeRetry;
- private boolean destroyed;
- private Thread t;
+ private boolean destroyed = true;
+ private boolean working = true;
+ private Thread processorThread;
- // Internal queue implementation
+ // Internal queue implementation
- private Object queueLock = new int[0];
+ private Object queueLock = new Object();
- // Dummy node
+ // Dummy node
- private Node head = new Node();
- private Node tail = head;
+ private Node head = new Node();
+ private Node tail = head;
- /**
- * Constructs with the specified listener and the cache name.
- *
- * @param listener
- * @param listenerId
- * @param cacheName
- */
- public CacheEventQueue( ICacheListener listener,
- long listenerId,
- String cacheName )
+ /**
+ * Constructs with the specified listener and the cache name.
+ *
+ * @param listener
+ * @param listenerId
+ * @param cacheName
+ */
+ public CacheEventQueue( ICacheListener listener, long listenerId,
+ String cacheName )
+ {
+ this( listener, listenerId, cacheName, 10, 500 );
+ }
+
+ /**
+ * Constructor for the CacheEventQueue object
+ *
+ * @param listener
+ * @param listenerId
+ * @param cacheName
+ * @param maxFailure
+ * @param waitBeforeRetry
+ */
+ public CacheEventQueue(
+ ICacheListener listener,
+ long listenerId,
+ String cacheName,
+ int maxFailure,
+ int waitBeforeRetry )
+ {
+ if ( listener == null )
{
- this( listener, listenerId, cacheName, 10, 500 );
+ throw new IllegalArgumentException( "listener must not be null" );
}
- /**
- * Constructor for the CacheEventQueue object
- *
- * @param listener
- * @param listenerId
- * @param cacheName
- * @param maxFailure
- * @param waitBeforeRetry
- */
- public CacheEventQueue( ICacheListener listener,
- long listenerId,
- String cacheName,
- int maxFailure,
- int waitBeforeRetry )
+ this.listener = listener;
+ this.listenerId = listenerId;
+ this.cacheName = cacheName;
+ this.maxFailure = maxFailure <= 0 ? 3 : maxFailure;
+ this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
+
+ if ( log.isDebugEnabled() )
{
- if ( listener == null )
- {
- throw new IllegalArgumentException( "listener must not be null" );
- }
+ log.debug( "Constructed: " + this );
+ }
+ }
- this.listener = listener;
- this.listenerId = listenerId;
- this.cacheName = cacheName;
- this.maxFailure = maxFailure <= 0 ? 10 : maxFailure;
- this.waitBeforeRetry = waitBeforeRetry <= 0 ? 500 : waitBeforeRetry;
+ /**
+ * Event Q is emtpy.
+ */
+ public synchronized void stopProcessing()
+ {
- this.t = new QProcessor();
- this.t.start();
+ destroyed = true;
+ processorThread = null;
- if ( log.isDebugEnabled() )
- {
- log.debug( "Constructed: " + this );
- }
- }
+ }
- /**
- * Event Q is emtpy.
- */
- public synchronized void destroy()
- {
- if ( !this.destroyed )
- {
- this.destroyed = true;
+ /**
+ * Returns the time to wait for events before killing the background thread.
+ */
+ public int getWaitToDieMillis()
+ {
+ return waitToDieMillis;
+ }
- // sychronize on queue so the thread will not wait forever,
- // and then interrupt the QueueProcessor
+ /**
+ * Sets the time to wait for events before killing the background thread.
+ */
+ public void setWaitToDieMillis( int wtdm)
+ {
+ waitToDieMillis = wtdm;
+ }
- synchronized ( this.queueLock )
- {
- this.t.interrupt();
- }
+ /**
+ * @return
+ */
+ public String toString()
+ {
+ return "CacheEventQueue [listenerId=" + listenerId + ", cacheName=" +
+ cacheName + "]";
+ }
+
+ /**
+ * @return The {3} value
+ */
+ public boolean isAlive()
+ {
+ return ( !destroyed );
+ }
+
+ public void setAlive( boolean aState )
+ {
+ destroyed = !aState;
+ }
+
+ /**
+ * @return The {3} value
+ */
+ public long getListenerId()
+ {
+ return listenerId;
+ }
+
+ /**
+ * Event Q is emtpy.
+ */
+ public synchronized void destroy()
+ {
+ if ( !destroyed )
+ {
+ destroyed = true;
- this.t = null;
+ // sychronize on queue so the thread will not wait forever,
+ // and then interrupt the QueueProcessor
- log.info( "Cache event queue destroyed: " + this );
+ if ( processorThread != null )
+ {
+ synchronized ( queueLock )
+ {
+ processorThread.interrupt();
}
+ }
+ processorThread = null;
+
+ log.info( "Cache event queue destroyed: " + this );
}
+ }
- /**
- * @return
- */
- public String toString()
+ /**
+ * @param ce
+ * The feature to be added to the PutEvent attribute
+ * @exception IOException
+ */
+ public synchronized void addPutEvent( ICacheElement ce ) throws IOException
+ {
+ if ( isWorking() )
{
- return "listenerId=" + this.listenerId + ", cacheName=" + this.cacheName;
+ put( new PutEvent( ce ) );
}
-
- /**
- * @return The {3} value
- */
- public boolean isAlive()
+ else
{
- return ( !this.destroyed );
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( "Not enqueuing Put Event for [" +
+ this +"] because it's non-functional." );
+ }
}
+ }
- /**
- * @return The {3} value
- */
- public long getListenerId()
+ /**
+ * @param key
+ * The feature to be added to the RemoveEvent attribute
+ * @exception IOException
+ */
+ public synchronized void addRemoveEvent( Serializable key ) throws IOException
+ {
+ if ( isWorking() )
{
- return this.listenerId;
+ put( new RemoveEvent( key ) );
}
-
- /**
- * @param ce The feature to be added to the PutEvent attribute
- * @exception IOException
- */
- public synchronized void addPutEvent( ICacheElement ce )
- throws IOException
+ else
{
- if ( !this.destroyed )
- {
- put( new PutEvent( ce ) );
- }
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( "Not enqueuing Remove Event for [" +
+ this +"] because it's non-functional." );
+ }
}
+ }
- /**
- * @param key The feature to be added to the RemoveEvent attribute
- * @exception IOException
- */
- public synchronized void addRemoveEvent( Serializable key )
- throws IOException
+ /**
+ * @exception IOException
+ */
+ public synchronized void addRemoveAllEvent() throws IOException
+ {
+ if ( isWorking() )
{
- if ( !this.destroyed )
- {
- put( new RemoveEvent( key ) );
- }
+ put( new RemoveAllEvent() );
}
-
- /**
- * @exception IOException
- */
- public synchronized void addRemoveAllEvent()
- throws IOException
+ else
{
- if ( !this.destroyed )
- {
- put( new RemoveAllEvent() );
- }
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( "Not enqueuing RemoveAll Event for [" +
+ this +"] because it's non-functional." );
+ }
}
+ }
- /**
- * @exception IOException
- */
- public synchronized void addDisposeEvent()
- throws IOException
+ /**
+ * @exception IOException
+ */
+ public synchronized void addDisposeEvent() throws IOException
+ {
+ if ( isWorking() )
{
- if ( !this.destroyed )
- {
- put( new DisposeEvent() );
- }
+ put( new DisposeEvent() );
+ }
+ else
+ {
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( "Not enqueuing Dispose Event for [" +
+ this +"] because it's non-functional." );
+ }
}
+ }
- /**
- * Adds an event to the queue.
- *
- * @param event
- */
- private void put( AbstractCacheEvent event )
+ /**
+ * Adds an event to the queue.
+ *
+ * @param event
+ */
+ private void put( AbstractCacheEvent event )
+ {
+ Node newNode = new Node();
+ if ( log.isDebugEnabled() )
{
- Node newNode = new Node();
+ log.debug( "Event entering Queue for " + cacheName + ": " + event );
+ }
- newNode.event = event;
+ newNode.event = event;
- synchronized ( this.queueLock )
+ synchronized ( queueLock )
+ {
+ tail.next = newNode;
+ tail = newNode;
+ if ( isWorking() )
+ {
+ if ( !isAlive() )
{
- this.tail.next = newNode;
- this.tail = newNode;
-
- this.queueLock.notify();
+ destroyed = false;
+ processorThread = new QProcessor( this );
+ processorThread.start();
+ }
+ else
+ {
+ queueLock.notify();
}
+ }
}
+ }
- private AbstractCacheEvent take() throws InterruptedException
+ /**
+ * Returns the next cache event from the queue or null if there are no events
+ * in the queue.
+ *
+ */
+ private AbstractCacheEvent take()
+ {
+ synchronized ( queueLock )
{
- synchronized ( this.queueLock )
- {
- // wait until there is something to read
+ // wait until there is something to read
+ if ( head == tail )
+ {
+ return null;
+ }
- while ( this.head == this.tail )
- {
- this.queueLock.wait();
- }
+ Node node = head.next;
- // we have the lock, and the list is not empty
+ AbstractCacheEvent value = node.event;
- Node node = this.head.next;
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "head.event = " + head.event );
+ log.debug( "node.event = " + node.event );
+ }
- // This is an awful bug. This will always return null.
- // This make the event Q and event destroyer.
- //AbstractCacheEvent value = head.event;
+ // Node becomes the new head (head is always empty)
- // corrected
- AbstractCacheEvent value = node.event;
+ node.event = null;
+ head = node;
- if ( log.isDebugEnabled() )
- {
- log.debug( "head.event = " + this.head.event );
- log.debug( "node.event = " + node.event );
- }
+ return value;
+ }
+ }
- // Node becomes the new head (head is always empty)
+ ///////////////////////////// Inner classes /////////////////////////////
- node.event = null;
- this.head = node;
+ private static class Node
+ {
+ Node next = null;
+ AbstractCacheEvent event = null;
+ }
- return value;
- }
- }
+ /**
+ * @author asmuts @created January 15, 2002
+ */
+ private class QProcessor
+ extends Thread
+ {
+ CacheEventQueue queue;
+ /**
+ * Constructor for the QProcessor object
+ */
+ QProcessor( CacheEventQueue aQueue )
+ {
- ///////////////////////////// Inner classes /////////////////////////////
+ super( "CacheEventQueue.QProcessor-" + aQueue.cacheName );
- private static class Node
- {
- Node next = null;
- AbstractCacheEvent event = null;
+ setDaemon( true );
+ queue = aQueue;
}
/**
+ * Main processing method for the QProcessor object.
+ *
+ * Waits for a specified time (waitToDieMillis) for something to come in
+ * and if no new events come in during that period the run method can exit
+ * and the thread is dereferenced.
*/
- private class QProcessor extends Thread
+ public void run()
{
- /**
- * Constructor for the QProcessor object
- */
- QProcessor()
- {
- super( "CacheEventQueue.QProcessor-" + ( ++CacheEventQueue.this.processorInstanceCount ) );
+ AbstractCacheEvent r = null;
- setDaemon( true );
+ while ( queue.isAlive() )
+ {
+ r = queue.take();
+
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Event from queue = " + r );
}
- /**
- * Main processing method for the QProcessor object
- */
- public void run()
+ if ( r == null )
{
- AbstractCacheEvent r = null;
+ synchronized ( queueLock )
+ {
+ try
+ {
+ queueLock.wait( queue.getWaitToDieMillis() );
+ }
+ catch ( InterruptedException e )
+ {
+ log.warn(
+ "Interrupted while waiting for another event to come in before we die." );
+ return;
+ }
+ r = queue.take();
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Event from queue after sleep = " + r );
+ }
+ if ( r == null )
+ {
+ queue.stopProcessing();
+ }
+ }
+ }
- while ( !CacheEventQueue.this.destroyed )
+ if ( queue.isWorking() && queue.isAlive() && r != null )
+ {
+ r.run();
+ }
+ }
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "QProcessor exiting for " + queue );
+ }
+ }
+ }
+
+ /**
+ * Retries before declaring failure.
+ *
+ * @author asmuts @created January 15, 2002
+ */
+ private abstract class AbstractCacheEvent
+ implements Runnable
+ {
+ int failures = 0;
+ boolean done = false;
+
+ /**
+ * Main processing method for the AbstractCacheEvent object
+ */
+ public void run()
+ {
+ try
+ {
+ doRun();
+ }
+ catch ( IOException e )
+ {
+ if ( log.isWarnEnabled() )
+ {
+ log.warn( e );
+ }
+ if ( ++failures >= maxFailure )
+ {
+ if ( log.isWarnEnabled() )
+ {
+ log.warn(
+ "Error while running event from Queue: "
+ + this
+ +". Dropping Event and marking Event Queue as non-functional." );
+ }
+ setWorking( false );
+ setAlive( false );
+ return;
+ }
+ else
+ {
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Error while running event from Queue: " +
+ this +". Retrying..." );
+ }
+ try
+ {
+ Thread.sleep( waitBeforeRetry );
+ run();
+ }
+ catch ( InterruptedException ie )
+ {
+ if ( log.isErrorEnabled() )
{
- try
- {
- r = take();
-
- if ( log.isDebugEnabled() )
- {
- log.debug( "r from take() = " + r );
- }
-
- }
- catch ( InterruptedException e )
- {
- // We were interrupted, just continue -- the while loop
- // will exit if we have been properly destroyed.
- }
-
- if ( !CacheEventQueue.this.destroyed && r != null )
- {
- r.run();
- }
+ log.warn( "Interrupted while sleeping for retry on event " +
+ this +"." );
}
- // declare failure as listener is permanently unreachable.
- // queue = null;
- CacheEventQueue.this.listener = null;
- // The listener failure logging more the problem of the user
- // of the q.
- log.info( "QProcessor exiting for " + CacheEventQueue.this );
+ setWorking( false );
+ setAlive( false );
+ }
}
+ }
}
/**
- * Retries before declaring failure.
- *
+ * @exception IOException
*/
- private abstract class AbstractCacheEvent implements Runnable
- {
- /**
- * Main processing method for the AbstractCacheEvent object
- */
- public void run()
- {
- IOException ex = null;
+ protected abstract void doRun() throws IOException;
+ }
- while ( !CacheEventQueue.this.destroyed
- && CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
- {
- try
- {
- ex = null;
- doRun();
- CacheEventQueue.this.failureCount = 0;
- return;
- // happy and done.
- }
- catch ( IOException e )
- {
- CacheEventQueue.this.failureCount++;
- ex = e;
- }
- // Let's get idle for a while before retry.
- if ( !CacheEventQueue.this.destroyed
- && CacheEventQueue.this.failureCount <= CacheEventQueue.this.maxFailure )
- {
- try
- {
- log.warn( "...retrying propagation " + CacheEventQueue.this + "..." + CacheEventQueue.this.failureCount );
- Thread.currentThread().sleep( CacheEventQueue.this.waitBeforeRetry );
- }
- catch ( InterruptedException ie )
- {
- // ignore;
- }
- }
- }
- // Too bad. The remote host is unreachable, so we give up.
- if ( ex != null )
- {
- log.warn( "Giving up propagation " + CacheEventQueue.this, ex );
+ /**
+ * @author asmuts @created January 15, 2002
+ */
+ private class PutEvent
+ extends AbstractCacheEvent
+ {
- destroy();
- }
- return;
- }
+ private ICacheElement ice;
- /**
- * Description of the Method
- *
- * @exception IOException
- */
- protected abstract void doRun()
- throws IOException;
+ /**
+ * Constructor for the PutEvent object
+ *
+ * @param ice
+ * @exception IOException
+ */
+ PutEvent( ICacheElement ice ) throws IOException
+ {
+ this.ice = ice;
+ /*
+ * this.key = key; this.obj = CacheUtils.dup(obj); this.attr = attr; this.groupName = groupName;
+ */
}
/**
+ * Description of the Method
+ *
+ * @exception IOException
*/
- private class PutEvent extends AbstractCacheEvent
+ protected void doRun() throws IOException
{
+ /*
+ * CacheElement ce = new CacheElement(cacheName, key, obj); ce.setElementAttributes( attr ); ce.setGroupName(
+ * groupName );
+ */
+ listener.handlePut( ice );
+ }
- private ICacheElement ice;
-
- /**
- * Constructor for the PutEvent object
- *
- * @param ice
- * @exception IOException
- */
- PutEvent( ICacheElement ice )
- throws IOException
- {
- this.ice = ice;
- /*
- * this.key = key;
- * this.obj = CacheUtils.dup(obj);
- * this.attr = attr;
- * this.groupName = groupName;
- */
- }
+ public String toString()
+ {
+ return new StringBuffer( "PutEvent for key: " )
+ .append( ice.getKey() )
+ .append( " value: " )
+ .append( ice.getVal() )
+ .toString();
+ }
+
+ }
+
+ /**
+ * Description of the Class
+ *
+ * @author asmuts @created January 15, 2002
+ */
+ private class RemoveEvent
+ extends AbstractCacheEvent
+ {
+ private Serializable key;
- /**
- * Description of the Method
- *
- * @exception IOException
- */
- protected void doRun()
- throws IOException
- {
- /*
- * CacheElement ce = new CacheElement(cacheName, key, obj);
- * ce.setElementAttributes( attr );
- * ce.setGroupName( groupName );
- */
- CacheEventQueue.this.listener.handlePut( ice );
- }
+ /**
+ * Constructor for the RemoveEvent object
+ *
+ * @param key
+ * @exception IOException
+ */
+ RemoveEvent( Serializable key ) throws IOException
+ {
+ this.key = key;
}
/**
- * Description of the Class
+ * Description of the Method
*
+ * @exception IOException
*/
- private class RemoveEvent extends AbstractCacheEvent
+ protected void doRun() throws IOException
{
- private Serializable key;
-
- /**
- * Constructor for the RemoveEvent object
- *
- * @param key
- * @exception IOException
- */
- RemoveEvent( Serializable key )
- throws IOException
- {
- this.key = key;
- }
+ listener.handleRemove( cacheName, key );
+ }
- /**
- * Description of the Method
- *
- * @exception IOException
- */
- protected void doRun()
- throws IOException
- {
- CacheEventQueue.this.listener.handleRemove( CacheEventQueue.this.cacheName, key );
- }
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ public String toString()
+ {
+ return new StringBuffer( "RemoveEvent for " ).append( key ).toString();
}
+ }
+
+ /**
+ * Description of the Class
+ *
+ * @author asmuts @created January 15, 2002
+ */
+ private class RemoveAllEvent
+ extends AbstractCacheEvent
+ {
+
/**
- * Description of the Class
+ * Description of the Method
+ *
+ * @exception IOException
+ */
+ protected void doRun() throws IOException
+ {
+ listener.handleRemoveAll( cacheName );
+ }
+
+ /*
+ * (non-Javadoc)
*
+ * @see java.lang.Object#toString()
*/
- private class RemoveAllEvent extends AbstractCacheEvent
+ public String toString()
{
- /**
- * Description of the Method
- *
- * @exception IOException
- */
- protected void doRun()
- throws IOException
- {
- CacheEventQueue.this.listener.handleRemoveAll( CacheEventQueue.this.cacheName );
- }
+ return "RemoveAllEvent";
}
+ }
+
+ /**
+ * Description of the Class
+ *
+ * @author asmuts @created January 15, 2002
+ */
+ private class DisposeEvent
+ extends AbstractCacheEvent
+ {
+
/**
- * Description of the Class
+ * Called when gets to the end of the queue
*
+ * @exception IOException
*/
- private class DisposeEvent extends AbstractCacheEvent
+ protected void doRun() throws IOException
{
- /**
- * Description of the Method
- *
- * @exception IOException
- */
- protected void doRun()
- throws IOException
- {
- CacheEventQueue.this.listener.handleDispose( CacheEventQueue.this.cacheName );
- }
+ listener.handleDispose( cacheName );
}
-}
+ public String toString()
+ {
+ return "DisposeEvent";
+ }
+ }
+
+ /**
+ * @return
+ */
+ public boolean isWorking()
+ {
+ return working;
+ }
+
+ /**
+ * @param b
+ */
+ public void setWorking( boolean b )
+ {
+ working = b;
+ }
+
+ public boolean isEmpty()
+ {
+ return tail == head;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org