You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2009/06/18 19:15:59 UTC

svn commit: r786170 - in /jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control: CompositeCache.java event/ElementEventQueue.java

Author: asmuts
Date: Thu Jun 18 17:15:59 2009
New Revision: 786170

URL: http://svn.apache.org/viewvc?rev=786170&view=rev
Log:
Making the element event queue a non-singleton in preparation for multi-instance JCS.

Modified:
    jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java?rev=786170&r1=786169&r2=786170&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/CompositeCache.java Thu Jun 18 17:15:59 2009
@@ -74,11 +74,10 @@
     private final static Log log = LogFactory.getLog( CompositeCache.class );
 
     /**
-     * EventQueue for handling element events. 1 should be enough for all the regions. Else should
-     * create as needed per region.
+     * EventQueue for handling element events. Lazy initialized. One for each region. To be more eficient, the manager
+     * should pass a shared queue in.
      */
-    // TODO fix for multi-instance JCS, have the manager pass this in.
-    public static IElementEventQueue elementEventQ = new ElementEventQueue( "AllRegionQueue" );
+    public IElementEventQueue elementEventQ;
 
     /** Auxiliary caches. */
     private AuxiliaryCache[] auxCaches = new AuxiliaryCache[0];
@@ -167,7 +166,7 @@
      * @param ce
      * @exception IOException
      */
-    public synchronized void update( ICacheElement ce )
+    public void update( ICacheElement ce )
         throws IOException
     {
         update( ce, false );
@@ -179,7 +178,7 @@
      * @param ce
      * @exception IOException
      */
-    public synchronized void localUpdate( ICacheElement ce )
+    public void localUpdate( ICacheElement ce )
         throws IOException
     {
         update( ce, true );
@@ -193,11 +192,9 @@
      * @param localOnly Whether the operation should be restricted to local auxiliaries.
      * @exception IOException
      */
-    protected synchronized void update( ICacheElement cacheElement, boolean localOnly )
+    protected void update( ICacheElement cacheElement, boolean localOnly )
         throws IOException
     {
-        // not thread safe, but just for debugging and testing.
-        updateCount++;
 
         if ( cacheElement.getKey() instanceof String
             && cacheElement.getKey().toString().endsWith( CacheConstants.NAME_COMPONENT_DELIMITER ) )
@@ -215,9 +212,14 @@
             log.debug( "Updating memory cache" );
         }
 
-        memCache.update( cacheElement );
+        synchronized ( this )
+        {
+            updateCount++;
+
+            memCache.update( cacheElement );
 
-        updateAuxiliaries( cacheElement, localOnly );
+            updateAuxiliaries( cacheElement, localOnly );
+        }
     }
 
     /**
@@ -572,7 +574,7 @@
                             break;
                         }
                     }
-                }
+                }                
             }
         }
         catch ( Exception e )
@@ -1681,6 +1683,7 @@
      * <p>
      * This does not call handle directly; instead the handler and the event are put into a queue.
      * This prevents the event handling from blocking normal cache operations.
+     * <p>
      * @param ce
      * @param eventType
      */
@@ -1713,6 +1716,7 @@
 
     /**
      * Adds an ElementEvent to be handled to the queue.
+     * <p>
      * @param hand The IElementEventHandler
      * @param event The IElementEventHandler IElementEvent event
      * @exception IOException Description of the Exception
@@ -1724,6 +1728,14 @@
         {
             log.debug( "Adding event to Element Event Queue" );
         }
+        // lazy init
+        synchronized ( this )
+        {
+            if ( elementEventQ == null )
+            {
+                elementEventQ = new ElementEventQueue( this.getCacheName() );
+            }
+        }
         elementEventQ.addElementEvent( hand, event );
     }
 

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java?rev=786170&r1=786169&r2=786170&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/engine/control/event/ElementEventQueue.java Thu Jun 18 17:15:59 2009
@@ -36,17 +36,23 @@
     /** The logger */
     private final static Log log = LogFactory.getLog( ElementEventQueue.class );
 
-    /** number of processors */
-    private static int processorInstanceCount = 0;
-
     /** The cache (region) name. */
     private String cacheName;
 
+    /** default */
+    private static final int DEFAULT_WAIT_TO_DIE_MILLIS = 10000;
+
+    /**
+     * time to wait for an event before snuffing the background thread if the queue is empty. make
+     * configurable later
+     */
+    private int waitToDieMillis = DEFAULT_WAIT_TO_DIE_MILLIS;
+
     /** shutdown or not */
     private boolean destroyed = false;
 
     /** The worker thread. */
-    private Thread t;
+    private Thread processorThread;
 
     /** Internal queue implementation */
     private Object queueLock = new Object();
@@ -57,17 +63,20 @@
     /** tail of the doubly linked list */
     private Node tail = head;
 
+    /** Number of items in the queue */
+    private int size = 0;
+
     /**
      * Constructor for the ElementEventQueue object
+     * <p>
      * @param cacheName
      */
     public ElementEventQueue( String cacheName )
     {
-
         this.cacheName = cacheName;
 
-        t = new QProcessor();
-        t.start();
+        processorThread = new QProcessor( this );
+        processorThread.start();
 
         if ( log.isDebugEnabled() )
         {
@@ -76,7 +85,7 @@
     }
 
     /**
-     * Event Q is emtpy.
+     * Event Q is empty.
      */
     public synchronized void destroy()
     {
@@ -84,21 +93,53 @@
         {
             destroyed = true;
 
-            // sychronize on queue so the thread will not wait forever,
+            // synchronize on queue so the thread will not wait forever,
             // and then interrupt the QueueProcessor
-
             synchronized ( queueLock )
             {
-                t.interrupt();
+                processorThread.interrupt();
             }
 
-            t = null;
+            processorThread = null;
 
-            log.info( "Element event queue destroyed: " + this );
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Element event queue destroyed: " + this );
+            }
         }
     }
 
     /**
+     * Kill the processor thread and indicate that the queue is detroyed and no longer alive, but it
+     * can still be working.
+     */
+    public synchronized void stopProcessing()
+    {
+        destroyed = true;
+        processorThread = null;
+    }
+
+    /**
+     * Returns the time to wait for events before killing the background thread.
+     * <p>
+     * @return int
+     */
+    public int getWaitToDieMillis()
+    {
+        return waitToDieMillis;
+    }
+
+    /**
+     * Sets the time to wait for events before killing the background thread.
+     * <p>
+     * @param wtdm the ms for the q to sit idle.
+     */
+    public void setWaitToDieMillis( int wtdm )
+    {
+        waitToDieMillis = wtdm;
+    }
+
+    /**
      * @return the region name for the event queue
      */
     public String toString()
@@ -107,6 +148,16 @@
     }
 
     /**
+     * Returns the number of elements in the queue.
+     * <p>
+     * @return number of items in the queue.
+     */
+    public int size()
+    {
+        return size;
+    }
+
+    /**
      * @return The destroyed value
      */
     public boolean isAlive()
@@ -154,9 +205,19 @@
 
         synchronized ( queueLock )
         {
+            size++;
             tail.next = newNode;
             tail = newNode;
-
+            if ( !isAlive() )
+            {
+                destroyed = false;
+                processorThread = new QProcessor( this );
+                processorThread.start();
+            }
+            else
+            {
+                queueLock.notify();
+            }
             queueLock.notify();
         }
     }
@@ -165,32 +226,17 @@
      * Returns the next item on the queue, or waits if empty.
      * <p>
      * @return AbstractElementEventRunner
-     * @throws InterruptedException
      */
     private AbstractElementEventRunner take()
-        throws InterruptedException
     {
         synchronized ( queueLock )
         {
             // wait until there is something to read
-
-            while ( head == tail )
+            if ( head == tail )
             {
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "Waiting for something to come into the Q" );
-                }
-
-                queueLock.wait();
-
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "Something came into the Q" );
-                }
+                return null;
             }
 
-            // we have the lock, and the list is not empty
-
             Node node = head.next;
 
             AbstractElementEventRunner value = node.event;
@@ -206,6 +252,7 @@
             node.event = null;
             head = node;
 
+            size--;
             return value;
         }
     }
@@ -227,48 +274,75 @@
     private class QProcessor
         extends Thread
     {
+        /** The event queue */
+        ElementEventQueue queue;
+
         /**
          * Constructor for the QProcessor object
+         * <p>
+         * @param aQueue 
          */
-        QProcessor()
+        QProcessor( ElementEventQueue aQueue )
         {
-            super( "ElementEventQueue.QProcessor-" + ( ++processorInstanceCount ) );
+            super( "ElementEventQueue.QProcessor-" + aQueue.cacheName );
 
             setDaemon( true );
+            queue = aQueue;
         }
 
         /**
-         * Main processing method for the QProcessor object
+         * Main processing method for the QProcessor object.
+         * <p>
+         * Waits for a specified time (waitToDieMillis) for something to come in and if no new
+         * events come in during that period the run method can exit and the thread is dereferenced.
          */
         public void run()
         {
-            AbstractElementEventRunner r = null;
+            AbstractElementEventRunner event = null;
 
-            while ( !destroyed )
+            while ( queue.isAlive() )
             {
-                try
+                event = queue.take();
+
+                if ( log.isDebugEnabled() )
                 {
-                    r = take();
+                    log.debug( "Event from queue = " + event );
+                }
 
-                    if ( log.isDebugEnabled() )
+                if ( event == null )
+                {
+                    synchronized ( queueLock )
                     {
-                        log.debug( "r from take() = " + r );
+                        try
+                        {
+                            queueLock.wait( queue.getWaitToDieMillis() );
+                        }
+                        catch ( InterruptedException e )
+                        {
+                            log.warn( "Interrupted while waiting for another event to come in before we die." );
+                            return;
+                        }
+                        event = queue.take();
+                        if ( log.isDebugEnabled() )
+                        {
+                            log.debug( "Event from queue after sleep = " + event );
+                        }
+                    }
+                    if ( event == null )
+                    {
+                        queue.stopProcessing();
                     }
-
-                }
-                catch ( InterruptedException e )
-                {
-                    // We were interrupted, so terminate gracefully.
-                    this.destroy();
                 }
 
-                if ( !destroyed && r != null )
+                if ( queue.isAlive() && event != null )
                 {
-                    r.run();
+                    event.run();
                 }
             }
-
-            log.info( "QProcessor exiting for " + ElementEventQueue.this );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "QProcessor exiting for " + queue );
+            }
         }
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jcs-dev-help@jakarta.apache.org