You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2009/06/18 19:15:59 UTC
svn commit: r786170 - in
/jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control:
CompositeCache.java event/ElementEventQueue.java
Author: asmuts
Date: Thu Jun 18 17:15:59 2009
New Revision: 786170
URL: http://svn.apache.org/viewvc?rev=786170&view=rev
Log:
Making the element event queue a non-singleton in preparation for multi-instance JCS.
Modified:
jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java
jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java?rev=786170&r1=786169&r2=786170&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java Thu Jun 18 17:15:59 2009
@@ -74,11 +74,10 @@
private final static Log log = LogFactory.getLog( CompositeCache.class );
/**
- * EventQueue for handling element events. 1 should be enough for all the regions. Else should
- * create as needed per region.
+ * EventQueue for handling element events. Lazy initialized. One for each region. To be more eficient, the manager
+ * should pass a shared queue in.
*/
- // TODO fix for multi-instance JCS, have the manager pass this in.
- public static IElementEventQueue elementEventQ = new ElementEventQueue( "AllRegionQueue" );
+ public IElementEventQueue elementEventQ;
/** Auxiliary caches. */
private AuxiliaryCache[] auxCaches = new AuxiliaryCache[0];
@@ -167,7 +166,7 @@
* @param ce
* @exception IOException
*/
- public synchronized void update( ICacheElement ce )
+ public void update( ICacheElement ce )
throws IOException
{
update( ce, false );
@@ -179,7 +178,7 @@
* @param ce
* @exception IOException
*/
- public synchronized void localUpdate( ICacheElement ce )
+ public void localUpdate( ICacheElement ce )
throws IOException
{
update( ce, true );
@@ -193,11 +192,9 @@
* @param localOnly Whether the operation should be restricted to local auxiliaries.
* @exception IOException
*/
- protected synchronized void update( ICacheElement cacheElement, boolean localOnly )
+ protected void update( ICacheElement cacheElement, boolean localOnly )
throws IOException
{
- // not thread safe, but just for debugging and testing.
- updateCount++;
if ( cacheElement.getKey() instanceof String
&& cacheElement.getKey().toString().endsWith( CacheConstants.NAME_COMPONENT_DELIMITER ) )
@@ -215,9 +212,14 @@
log.debug( "Updating memory cache" );
}
- memCache.update( cacheElement );
+ synchronized ( this )
+ {
+ updateCount++;
+
+ memCache.update( cacheElement );
- updateAuxiliaries( cacheElement, localOnly );
+ updateAuxiliaries( cacheElement, localOnly );
+ }
}
/**
@@ -572,7 +574,7 @@
break;
}
}
- }
+ }
}
}
catch ( Exception e )
@@ -1681,6 +1683,7 @@
* <p>
* This does not call handle directly; instead the handler and the event are put into a queue.
* This prevents the event handling from blocking normal cache operations.
+ * <p>
* @param ce
* @param eventType
*/
@@ -1713,6 +1716,7 @@
/**
* Adds an ElementEvent to be handled to the queue.
+ * <p>
* @param hand The IElementEventHandler
* @param event The IElementEventHandler IElementEvent event
* @exception IOException Description of the Exception
@@ -1724,6 +1728,14 @@
{
log.debug( "Adding event to Element Event Queue" );
}
+ // lazy init
+ synchronized ( this )
+ {
+ if ( elementEventQ == null )
+ {
+ elementEventQ = new ElementEventQueue( this.getCacheName() );
+ }
+ }
elementEventQ.addElementEvent( hand, event );
}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java?rev=786170&r1=786169&r2=786170&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java Thu Jun 18 17:15:59 2009
@@ -36,17 +36,23 @@
/** The logger */
private final static Log log = LogFactory.getLog( ElementEventQueue.class );
- /** number of processors */
- private static int processorInstanceCount = 0;
-
/** The cache (region) name. */
private String cacheName;
+ /** default */
+ private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
+
+ /**
+ * time to wait for an event before snuffing the background thread if the queue is empty. make
+ * configurable later
+ */
+ private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
+
/** shutdown or not */
private boolean destroyed = false;
/** The worker thread. */
- private Thread t;
+ private Thread processorThread;
/** Internal queue implementation */
private Object queueLock = new Object();
@@ -57,17 +63,20 @@
/** tail of the doubly linked list */
private Node tail = head;
+ /** Number of items in the queue */
+ private int size = 0;
+
/**
* Constructor for the ElementEventQueue object
+ * <p>
* @param cacheName
*/
public ElementEventQueue( String cacheName )
{
-
this.cacheName = cacheName;
- t = new QProcessor();
- t.start();
+ processorThread = new QProcessor( this );
+ processorThread.start();
if ( log.isDebugEnabled() )
{
@@ -76,7 +85,7 @@
}
/**
- * Event Q is emtpy.
+ * Event Q is empty.
*/
public synchronized void destroy()
{
@@ -84,21 +93,53 @@
{
destroyed = true;
- // sychronize on queue so the thread will not wait forever,
+ // synchronize on queue so the thread will not wait forever,
// and then interrupt the QueueProcessor
-
synchronized ( queueLock )
{
- t.interrupt();
+ processorThread.interrupt();
}
- t = null;
+ processorThread = null;
- log.info( "Element event queue destroyed: " + this );
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Element event queue destroyed: " + this );
+ }
}
}
/**
+ * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
+ * can still be working.
+ */
+ public synchronized void stopProcessing()
+ {
+ destroyed = true;
+ processorThread = null;
+ }
+
+ /**
+ * Returns the time to wait for events before killing the background thread.
+ * <p>
+ * @return int
+ */
+ public int getWaitToDieMillis()
+ {
+ return waitToDieMillis;
+ }
+
+ /**
+ * Sets the time to wait for events before killing the background thread.
+ * <p>
+ * @param wtdm the ms for the q to sit idle.
+ */
+ public void setWaitToDieMillis( int wtdm )
+ {
+ waitToDieMillis = wtdm;
+ }
+
+ /**
* @return the region name for the event queue
*/
public String toString()
@@ -107,6 +148,16 @@
}
/**
+ * Returns the number of elements in the queue.
+ * <p>
+ * @return number of items in the queue.
+ */
+ public int size()
+ {
+ return size;
+ }
+
+ /**
* @return The destroyed value
*/
public boolean isAlive()
@@ -154,9 +205,19 @@
synchronized ( queueLock )
{
+ size++;
tail.next = newNode;
tail = newNode;
-
+ if ( !isAlive() )
+ {
+ destroyed = false;
+ processorThread = new QProcessor( this );
+ processorThread.start();
+ }
+ else
+ {
+ queueLock.notify();
+ }
queueLock.notify();
}
}
@@ -165,32 +226,17 @@
* Returns the next item on the queue, or waits if empty.
* <p>
* @return AbstractElementEventRunner
- * @throws InterruptedException
*/
private AbstractElementEventRunner take()
- throws InterruptedException
{
synchronized ( queueLock )
{
// wait until there is something to read
-
- while ( head == tail )
+ if ( head == tail )
{
- if ( log.isDebugEnabled() )
- {
- log.debug( "Waiting for something to come into the Q" );
- }
-
- queueLock.wait();
-
- if ( log.isDebugEnabled() )
- {
- log.debug( "Something came into the Q" );
- }
+ return null;
}
- // we have the lock, and the list is not empty
-
Node node = head.next;
AbstractElementEventRunner value = node.event;
@@ -206,6 +252,7 @@
node.event = null;
head = node;
+ size--;
return value;
}
}
@@ -227,48 +274,75 @@
private class QProcessor
extends Thread
{
+ /** The event queue */
+ ElementEventQueue queue;
+
/**
* Constructor for the QProcessor object
+ * <p>
+ * @param aQueue
*/
- QProcessor()
+ QProcessor( ElementEventQueue aQueue )
{
- super( "ElementEventQueue.QProcessor-" + ( ++processorInstanceCount ) );
+ super( "ElementEventQueue.QProcessor-" + aQueue.cacheName );
setDaemon( true );
+ queue = aQueue;
}
/**
- * Main processing method for the QProcessor object
+ * Main processing method for the QProcessor object.
+ * <p>
+ * Waits for a specified time (waitToDieMillis) for something to come in and if no new
+ * events come in during that period the run method can exit and the thread is dereferenced.
*/
public void run()
{
- AbstractElementEventRunner r = null;
+ AbstractElementEventRunner event = null;
- while ( !destroyed )
+ while ( queue.isAlive() )
{
- try
+ event = queue.take();
+
+ if ( log.isDebugEnabled() )
{
- r = take();
+ log.debug( "Event from queue = " + event );
+ }
- if ( log.isDebugEnabled() )
+ if ( event == null )
+ {
+ synchronized ( queueLock )
{
- log.debug( "r from take() = " + r );
+ try
+ {
+ queueLock.wait( queue.getWaitToDieMillis() );
+ }
+ catch ( InterruptedException e )
+ {
+ log.warn( "Interrupted while waiting for another event to come in before we die." );
+ return;
+ }
+ event = queue.take();
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Event from queue after sleep = " + event );
+ }
+ }
+ if ( event == null )
+ {
+ queue.stopProcessing();
}
-
- }
- catch ( InterruptedException e )
- {
- // We were interrupted, so terminate gracefully.
- this.destroy();
}
- if ( !destroyed && r != null )
+ if ( queue.isAlive() && event != null )
{
- r.run();
+ event.run();
}
}
-
- log.info( "QProcessor exiting for " + ElementEventQueue.this );
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "QProcessor exiting for " + queue );
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jcs-dev-help@jakarta.apache.org