You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@turbine.apache.org by jt...@apache.org on 2002/02/14 21:42:44 UTC

cvs commit: jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/reuse IFastQueue.java ITomcatQueue.java TomcatJglQueue.java TomcatQueue.java

jtaylor     02/02/14 12:42:44

  Modified:    .        build-test.xml build.xml default.properties
               src/java/org/apache/stratum/jcs/engine CacheEventQueue.java
               src/java/org/apache/stratum/jcs/utils/log LogEventQueue.java
  Removed:     src/java/org/apache/stratum/jcs/utils/reuse IFastQueue.java
                        ITomcatQueue.java TomcatJglQueue.java
                        TomcatQueue.java
  Log:
  Moving to LinkedQueue from util.concurrent, removed the other queue. Haven't
  tuned this much yet, no worried about LogEventQueue since it will be replaced
  by commons-logging + log4j (or whatever).
  
  Revision  Changes    Path
  1.6       +1 -0      jakarta-turbine-stratum/build-test.xml
  
  Index: build-test.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-stratum/build-test.xml,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- build-test.xml	14 Feb 2002 00:57:41 -0000	1.5
  +++ build-test.xml	14 Feb 2002 20:42:44 -0000	1.6
  @@ -17,6 +17,7 @@
       <pathelement location="${commons-collections.jar}"/>
       <pathelement location="${commons-util.jar}"/>
       <pathelement location="${commons-logging.jar}"/>
  +    <pathelement location="${concurrent.jar}"/>
       <pathelement location="${jgl.jar}"/>
       <pathelement location="${jetty.jar}"/>
       <pathelement location="${log4j.jar}"/>
  
  
  
  1.20      +2 -1      jakarta-turbine-stratum/build.xml
  
  Index: build.xml
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-stratum/build.xml,v
  retrieving revision 1.19
  retrieving revision 1.20
  diff -u -r1.19 -r1.20
  --- build.xml	14 Feb 2002 01:01:23 -0000	1.19
  +++ build.xml	14 Feb 2002 20:42:44 -0000	1.20
  @@ -17,6 +17,7 @@
       <pathelement location="${commons-collections.jar}"/>
       <pathelement location="${commons-util.jar}"/>
       <pathelement location="${commons-logging.jar}"/>
  +    <pathelement location="${concurrent.jar}"/>
       <pathelement location="${jgl.jar}"/>
       <pathelement location="${jetty.jar}"/>
       <pathelement location="${log4j.jar}"/>
  @@ -132,7 +133,7 @@
       <mkdir dir="${build.src}"/>
       <mkdir dir="${build.dest}"/>
       
  -    <copy todir="${build.src}" overwrite="yes">
  +    <copy todir="${build.src}">
         <fileset dir="${src.dir}/java/"/>
       </copy>
       
  
  
  
  1.18      +1 -0      jakarta-turbine-stratum/default.properties
  
  Index: default.properties
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-stratum/default.properties,v
  retrieving revision 1.17
  retrieving revision 1.18
  diff -u -r1.17 -r1.18
  --- default.properties	14 Feb 2002 12:52:00 -0000	1.17
  +++ default.properties	14 Feb 2002 20:42:44 -0000	1.18
  @@ -46,6 +46,7 @@
   commons-collections.jar = ${lib.repo}/commons-collections.jar
   commons-util.jar = ${lib.repo}/commons-util-0.1-dev.jar
   commons-logging.jar = ${lib.repo}/commons-logging.jar
  +concurrent.jar = ${lib.repo}/concurrent.jar
   dom4j.jar = ${lib.repo}/dom4j-1.1.jar
   junit.jar = ${lib.repo}/junit-3.7.jar
   
  
  
  
  1.4       +100 -114  jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/engine/CacheEventQueue.java
  
  Index: CacheEventQueue.java
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/engine/CacheEventQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- CacheEventQueue.java	15 Jan 2002 21:33:33 -0000	1.3
  +++ CacheEventQueue.java	14 Feb 2002 20:42:44 -0000	1.4
  @@ -7,66 +7,58 @@
   import org.apache.stratum.jcs.engine.behavior.ICacheEventQueue;
   import org.apache.stratum.jcs.engine.behavior.ICacheListener;
   
  -import org.apache.stratum.jcs.utils.log.Logger;
  -import org.apache.stratum.jcs.utils.log.LoggerManager;
  +import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
   
  -import org.apache.stratum.jcs.utils.reuse.ITomcatQueue;
  -import org.apache.stratum.jcs.utils.reuse.TomcatJglQueue;
  +import org.apache.commons.logging.Log;
  +import org.apache.commons.logging.LogSource;
   
   /**
  - *  An event queue is used to propagate ordered cache events to one and only one
  - *  target listener.
  - *
  - *@author     asmuts
  - *@created    January 15, 2002
  + * An event queue is used to propagate ordered cache events to one and only one
  + * target listener.
    */
   public class CacheEventQueue implements ICacheEventQueue
   {
  -    private static boolean debug = false;
  -    private ITomcatQueue q = new TomcatJglQueue();
  +    private static final Log log = LogSource.getInstance( CacheEventQueue.class );
  +    
  +    private LinkedQueue queue = new LinkedQueue();
  +    
       private ICacheListener listener;
       private byte listenerId;
       private String cacheName;
   
  -    private Logger log;
       private int failureCount;
       private int maxFailure;
  -    private int waitBeforeRetry;
  +    
       // in milliseconds
  +    private int waitBeforeRetry;
  +    
       private boolean destroyed;
       private Thread t;
   
   
       /**
  -     *  Constructs with the specified listener and the cache name.
  -     *
  -     *@param  listener    Description of the Parameter
  -     *@param  listenerId  Description of the Parameter
  -     *@param  cacheName   Description of the Parameter
  +     * Constructs with the specified listener and the cache name.
        */
  -    public CacheEventQueue( ICacheListener listener, byte listenerId, String cacheName )
  +    public CacheEventQueue( ICacheListener listener, 
  +                            byte listenerId, 
  +                            String cacheName )
       {
           this( listener, listenerId, cacheName, 10, 500 );
       }
  -
  -
  +    
       /**
  -     *  Constructor for the CacheEventQueue object
  -     *
  -     *@param  listener         Description of the Parameter
  -     *@param  listenerId       Description of the Parameter
  -     *@param  cacheName        Description of the Parameter
  -     *@param  maxFailure       Description of the Parameter
  -     *@param  waitBeforeRetry  Description of the Parameter
  +     * Constructor for the CacheEventQueue object
        */
  -    public CacheEventQueue( ICacheListener listener, byte listenerId, String cacheName,
  -                            int maxFailure, int waitBeforeRetry )
  +    public CacheEventQueue( ICacheListener listener, 
  +                            byte listenerId, 
  +                            String cacheName,
  +                            int maxFailure, 
  +                            int waitBeforeRetry )
       {
           if ( listener == null )
           {
               throw new IllegalArgumentException( "listener must not be null" );
           }
  -        this.log = LoggerManager.getLogger( this );
   
           this.listener = listener;
           this.listenerId = listenerId;
  @@ -77,141 +69,127 @@
           t = new QProcessor();
           t.start();
   
  -        if ( debug )
  +        if ( log.isDebugEnabled() )
           {
  -            log.debug( "Constructed for " + this );
  +            log.debug( "Constructed: " + this );
           }
       }
   
  -
  -    //////////////////////////////////////////////////////
       /**
  -     *  Description of the Method
  +     *
        */
       public synchronized void destroy()
       {
  -        if ( destroyed )
  -        {
  -            return;
  -        }
  -        destroyed = true;
  -        // sychronize the q so the thread will not wait forever.
  -        synchronized ( q )
  -        {
  -            t.interrupt();
  +        if ( ! destroyed )
  +        {          
  +            destroyed = true;
  +            
  +            // sychronize on queue so the thread will not wait forever, 
  +            // and then interrupt the QueueProcessor
  +
  +            synchronized ( queue )
  +            {
  +                t.interrupt();
  +            }
  +            
  +            t = null;
  +            
  +            log.warn( "Cache event queue destroyed: " + this );
           }
  -        t = null;
  -        log.warn( "*** Cache event queue destroyed " + this );
       }
   
  -
  -    //////////////////////////////////////////////////////
       /**
  -     *  Description of the Method
  -     *
  -     *@return    Description of the Return Value
  +     * 
        */
       public String toString()
       {
           return "listenerId=" + listenerId + ", cacheName=" + cacheName;
       }
   
  -
  -    ////////////////////////////////////////////////////
       /**
  -     *  Gets the alive attribute of the CacheEventQueue object
  -     *
  -     *@return    The alive value
  +     * 
        */
       public boolean isAlive()
       {
  -        return !destroyed;
  +        return ( ! destroyed );
       }
   
  -
  -    /////////////////////////////////////////////////////
       /**
  -     *  Gets the listenerId attribute of the CacheEventQueue object
  -     *
  -     *@return    The listenerId value
  +     * 
        */
       public byte getListenerId()
       {
           return listenerId;
       }
  -
  -
  -    //////////////////////////////////////////////////////////////////////////////
  +    
       /**
  -     *  Adds a feature to the PutEvent attribute of the CacheEventQueue object
  -     *
  -     *@param  ce               The feature to be added to the PutEvent attribute
  -     *@exception  IOException  Description of the Exception
  +     * 
        */
       public synchronized void addPutEvent( ICacheElement ce )
           throws IOException
       {
  -        if ( destroyed )
  +        if ( ! destroyed )
           {
  -            return;
  -            // Cache is dead.
  -        }
  -        if ( debug )
  -        {
  -            log.debug( "addPutEvent> key=" + ( String ) ce.getKey() );
  -        }
  -        q.put( new PutEvent( ce ) );
  -        if ( debug )
  -        {
  -            log.debug( "addPutEvent done> key=" + ( String ) ce.getKey() );
  +            put( new PutEvent( ce ) );
           }
       }
   
  -
       /**
  -     *@param  key              The feature to be added to the RemoveEvent
  -     *      attribute
  -     *@exception  IOException  Description of the Exception
  +     * 
        */
       public void addRemoveEvent( Serializable key )
           throws IOException
       {
  -        if ( destroyed )
  -        {
  -            return;
  -            // Cache is dead.
  +        if ( ! destroyed )
  +        {        
  +            put( new RemoveEvent( key ) );
           }
  -        q.put( new RemoveEvent( key ) );
       }
   
  -
       /**
  -     *@exception  IOException  Description of the Exception
  +     *
        */
       public synchronized void addRemoveAllEvent()
           throws IOException
       {
  -        if ( destroyed )
  -        {
  -            return;
  -            // Cache is dead.
  +        if ( ! destroyed )
  +        {           
  +            put( new RemoveAllEvent() );
           }
  -        q.put( new RemoveAllEvent() );
       }
   
  -
       /**
  -     *@exception  IOException  Description of the Exception
  +     *
        */
       public synchronized void addDisposeEvent()
           throws IOException
       {
  -        if ( destroyed )
  +        if ( ! destroyed ) 
  +        { 
  +            put( new DisposeEvent() );
  +        }
  +    }
  +    
  +    /**
  +     * Adds an event to the queue.
  +     */
  +    private void put( AbstractCacheEvent event )
  +    {
  +        try
           {
  -            return;
  -            // Cache is dead.
  +            queue.put( event );
  +        }
  +        catch ( InterruptedException e )
  +        {
  +            // We should handle terminated gracefully here, however the
  +            // LinkedQueue implementation of Channel shouldn't throw
  +            // this since puts are non blocking. For now I will ignore 
  +            // it. [james@jamestaylor.org]
  +            
  +            // Options:
  +            //   - destory self
  +            //   - destory and rethrow
           }
  -        q.put( new DisposeEvent() );
       }
   
   
  @@ -236,17 +214,28 @@
            */
           public void run()
           {
  +            Runnable r = null;
  +            
               while ( !destroyed )
  -            {
  -                Runnable r = ( Runnable ) q.pull();
  +            {                               
  +                try
  +                {
  +                     r = ( Runnable ) queue.take();
  +                }
  +                catch ( InterruptedException e )
  +                {
  +                    // We were interrupted, so terminate gracefully.
  +                    
  +                    this.destroy();
  +                }
   
  -                if ( !destroyed )
  +                if ( !destroyed && r != null )
                   {
                       r.run();
                   }
               }
               // declare failure as listener is permanently unreachable.
  -            q = null;
  +            queue = null;
               listener = null;
               log.warn( "QProcessor exiting for " + CacheEventQueue.this );
           }
  @@ -300,8 +289,8 @@
               // Too bad.  The remote host is unreachable, so we give up.
               if ( ex != null )
               {
  -                log.warn( "Warning: *** give up propagation " + CacheEventQueue.this );
  -                log.logEx( ex );
  +                log.warn( "Giving up propagation " + CacheEventQueue.this, ex );
  +
                   destroy();
               }
               return;
  @@ -373,15 +362,12 @@
           protected void doRun()
               throws IOException
           {
  -            if ( debug )
  -            {
  -                log.debug( "doRun> key=" + ( String ) ice.getKey() );
  -            }
               /*
                * CacheElement ce = new CacheElement(cacheName, key, obj);
                * ce.setAttributes( attr );
                * ce.setGroupName( groupName );
                */
  +            
               listener.handlePut( ice );
           }
       }
  
  
  
  1.4       +36 -39    jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/log/LogEventQueue.java
  
  Index: LogEventQueue.java
  ===================================================================
  RCS file: /home/cvs/jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/log/LogEventQueue.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- LogEventQueue.java	15 Jan 2002 21:33:36 -0000	1.3
  +++ LogEventQueue.java	14 Feb 2002 20:42:44 -0000	1.4
  @@ -3,36 +3,27 @@
   import org.apache.stratum.jcs.utils.log.ILogEventQueue;
   import org.apache.stratum.jcs.utils.log.ILogListener;
   
  -import org.apache.stratum.jcs.utils.reuse.ITomcatQueue;
  -import org.apache.stratum.jcs.utils.reuse.TomcatJglQueue;
  +import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
   
   /**
  - *  The log event queue where all logging requests are queued and processed in
  - *  FIFO order.
  + * The log event queue where all logging requests are queued and processed in
  + * FIFO order.
    *
  - *@author     asmuts
  - *@created    January 15, 2002
  - */
  -// Note: don't use Logger in this class.  Else may lead to endless recursive loop.
  + * Note: don't use Logger in this class.  Else may lead to endless recursive 
  + *       loop.
  + */ 
   class LogEventQueue implements ILogEventQueue
   {
       private static boolean debug = false;
  -    private ITomcatQueue q = new TomcatJglQueue();
       private ILogListener listener;
  +    
  +    private LinkedQueue queue = new LinkedQueue();
   
       private boolean destroyed;
       private Thread t;
  -    // TODO: parameterize the max q size.
  -    /**
  -     *  Description of the Field
  -     */
  -    public static int MAX_Q_SIZE = 1024 * 1024;
  -
   
       /**
  -     *  Constructs with the specified listener and the cache name.
  -     *
  -     *@param  listener  Description of the Parameter
  +     * Constructs with the specified listener and the cache name.
        */
       LogEventQueue( ILogListener listener )
       {
  @@ -63,7 +54,7 @@
           }
           destroyed = true;
           // sychronize the q so the thread will not wait forever.
  -        synchronized ( q )
  +        synchronized ( queue )
           {
               t.interrupt();
           }
  @@ -98,14 +89,16 @@
           if ( debug )
           {
               p( "addClearLogEvent>" );
  +        }       
  +        
  +        try
  +        {
  +            queue.put( new ClearLogEvent( logname ) );
           }
  -        if ( q.size() > MAX_Q_SIZE )
  +        catch ( InterruptedException e )
           {
  -            q.get();
  -            // drop the front item.
  -            p( "Log q size exceeded.  Dropping front item." );
  +            // Ignored, will return. FIXME: Should rethrow?
           }
  -        q.put( new ClearLogEvent( logname ) );
   
           if ( debug )
           {
  @@ -129,14 +122,16 @@
           {
               p( "addCycleLogEvent>" );
           }
  -        if ( q.size() > MAX_Q_SIZE )
  +                
  +        try
           {
  -            q.get();
  -            // drop the front item.
  -            p( "Log q size exceeded.  Dropping front item." );
  +            queue.put( new CycleLogEvent( logname ) );
           }
  -        q.put( new CycleLogEvent( logname ) );
  -
  +        catch ( InterruptedException e )
  +        {
  +            // Ignored, will return. FIXME: Should rethrow?
  +        }
  +        
           if ( debug )
           {
               p( "addCycleLogEvent done>" );
  @@ -160,20 +155,22 @@
           {
               p( "addDumpEvent> " + sb );
           }
  -        if ( q.size() > MAX_Q_SIZE )
  +        
  +        try
           {
  -            q.get();
  -            // drop the front item.
  -            p( "Log q size exceeded.  Dropping front item." );
  +            queue.put( new DumpEvent( logname, sb ) );
           }
  -        q.put( new DumpEvent( logname, sb ) );
  -
  +        catch ( InterruptedException e )
  +        {
  +            // Ignored, will return. FIXME: Should rethrow?
  +        }
  +        
           if ( debug )
           {
               p( "addDumpEvent done>" );
           }
       }
  -
  +    
   ///////////////////////////// Inner classes /////////////////////////////
   
       /**
  @@ -203,7 +200,7 @@
               {
                   try
                   {
  -                    Runnable r = ( Runnable ) q.pull();
  +                    Runnable r = ( Runnable ) queue.take();
   
                       if ( !destroyed )
                       {
  @@ -215,7 +212,7 @@
                       ex.printStackTrace( System.err );
                   }
               }
  -            q = null;
  +            queue = null;
               listener = null;
               p( "QProcessor exiting for " + LogEventQueue.this );
           }
  
  
  

--
To unsubscribe, e-mail:   <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>