You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@turbine.apache.org by jt...@apache.org on 2002/02/14 21:42:44 UTC
cvs commit: jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/reuse IFastQueue.java ITomcatQueue.java TomcatJglQueue.java TomcatQueue.java
jtaylor 02/02/14 12:42:44
Modified: . build-test.xml build.xml default.properties
src/java/org/apache/stratum/jcs/engine CacheEventQueue.java
src/java/org/apache/stratum/jcs/utils/log LogEventQueue.java
Removed: src/java/org/apache/stratum/jcs/utils/reuse IFastQueue.java
ITomcatQueue.java TomcatJglQueue.java
TomcatQueue.java
Log:
Moving to LinkedQueue from util.concurrent, removed the other queue. Haven't
tuned this much yet, no worried about LogEventQueue since it will be replaced
by commons-logging + log4j (or whatever).
Revision Changes Path
1.6 +1 -0 jakarta-turbine-stratum/build-test.xml
Index: build-test.xml
===================================================================
RCS file: /home/cvs/jakarta-turbine-stratum/build-test.xml,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- build-test.xml 14 Feb 2002 00:57:41 -0000 1.5
+++ build-test.xml 14 Feb 2002 20:42:44 -0000 1.6
@@ -17,6 +17,7 @@
<pathelement location="${commons-collections.jar}"/>
<pathelement location="${commons-util.jar}"/>
<pathelement location="${commons-logging.jar}"/>
+ <pathelement location="${concurrent.jar}"/>
<pathelement location="${jgl.jar}"/>
<pathelement location="${jetty.jar}"/>
<pathelement location="${log4j.jar}"/>
1.20 +2 -1 jakarta-turbine-stratum/build.xml
Index: build.xml
===================================================================
RCS file: /home/cvs/jakarta-turbine-stratum/build.xml,v
retrieving revision 1.19
retrieving revision 1.20
diff -u -r1.19 -r1.20
--- build.xml 14 Feb 2002 01:01:23 -0000 1.19
+++ build.xml 14 Feb 2002 20:42:44 -0000 1.20
@@ -17,6 +17,7 @@
<pathelement location="${commons-collections.jar}"/>
<pathelement location="${commons-util.jar}"/>
<pathelement location="${commons-logging.jar}"/>
+ <pathelement location="${concurrent.jar}"/>
<pathelement location="${jgl.jar}"/>
<pathelement location="${jetty.jar}"/>
<pathelement location="${log4j.jar}"/>
@@ -132,7 +133,7 @@
<mkdir dir="${build.src}"/>
<mkdir dir="${build.dest}"/>
- <copy todir="${build.src}" overwrite="yes">
+ <copy todir="${build.src}">
<fileset dir="${src.dir}/java/"/>
</copy>
1.18 +1 -0 jakarta-turbine-stratum/default.properties
Index: default.properties
===================================================================
RCS file: /home/cvs/jakarta-turbine-stratum/default.properties,v
retrieving revision 1.17
retrieving revision 1.18
diff -u -r1.17 -r1.18
--- default.properties 14 Feb 2002 12:52:00 -0000 1.17
+++ default.properties 14 Feb 2002 20:42:44 -0000 1.18
@@ -46,6 +46,7 @@
commons-collections.jar = ${lib.repo}/commons-collections.jar
commons-util.jar = ${lib.repo}/commons-util-0.1-dev.jar
commons-logging.jar = ${lib.repo}/commons-logging.jar
+concurrent.jar = ${lib.repo}/concurrent.jar
dom4j.jar = ${lib.repo}/dom4j-1.1.jar
junit.jar = ${lib.repo}/junit-3.7.jar
1.4 +100 -114 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/engine/CacheEventQueue.java
Index: CacheEventQueue.java
===================================================================
RCS file: /home/cvs/jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/engine/CacheEventQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- CacheEventQueue.java 15 Jan 2002 21:33:33 -0000 1.3
+++ CacheEventQueue.java 14 Feb 2002 20:42:44 -0000 1.4
@@ -7,66 +7,58 @@
import org.apache.stratum.jcs.engine.behavior.ICacheEventQueue;
import org.apache.stratum.jcs.engine.behavior.ICacheListener;
-import org.apache.stratum.jcs.utils.log.Logger;
-import org.apache.stratum.jcs.utils.log.LoggerManager;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import org.apache.stratum.jcs.utils.reuse.ITomcatQueue;
-import org.apache.stratum.jcs.utils.reuse.TomcatJglQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogSource;
/**
- * An event queue is used to propagate ordered cache events to one and only one
- * target listener.
- *
- *@author asmuts
- *@created January 15, 2002
+ * An event queue is used to propagate ordered cache events to one and only one
+ * target listener.
*/
public class CacheEventQueue implements ICacheEventQueue
{
- private static boolean debug = false;
- private ITomcatQueue q = new TomcatJglQueue();
+ private static final Log log = LogSource.getInstance( CacheEventQueue.class );
+
+ private LinkedQueue queue = new LinkedQueue();
+
private ICacheListener listener;
private byte listenerId;
private String cacheName;
- private Logger log;
private int failureCount;
private int maxFailure;
- private int waitBeforeRetry;
+
// in milliseconds
+ private int waitBeforeRetry;
+
private boolean destroyed;
private Thread t;
/**
- * Constructs with the specified listener and the cache name.
- *
- *@param listener Description of the Parameter
- *@param listenerId Description of the Parameter
- *@param cacheName Description of the Parameter
+ * Constructs with the specified listener and the cache name.
*/
- public CacheEventQueue( ICacheListener listener, byte listenerId, String cacheName )
+ public CacheEventQueue( ICacheListener listener,
+ byte listenerId,
+ String cacheName )
{
this( listener, listenerId, cacheName, 10, 500 );
}
-
-
+
/**
- * Constructor for the CacheEventQueue object
- *
- *@param listener Description of the Parameter
- *@param listenerId Description of the Parameter
- *@param cacheName Description of the Parameter
- *@param maxFailure Description of the Parameter
- *@param waitBeforeRetry Description of the Parameter
+ * Constructor for the CacheEventQueue object
*/
- public CacheEventQueue( ICacheListener listener, byte listenerId, String cacheName,
- int maxFailure, int waitBeforeRetry )
+ public CacheEventQueue( ICacheListener listener,
+ byte listenerId,
+ String cacheName,
+ int maxFailure,
+ int waitBeforeRetry )
{
if ( listener == null )
{
throw new IllegalArgumentException( "listener must not be null" );
}
- this.log = LoggerManager.getLogger( this );
this.listener = listener;
this.listenerId = listenerId;
@@ -77,141 +69,127 @@
t = new QProcessor();
t.start();
- if ( debug )
+ if ( log.isDebugEnabled() )
{
- log.debug( "Constructed for " + this );
+ log.debug( "Constructed: " + this );
}
}
-
- //////////////////////////////////////////////////////
/**
- * Description of the Method
+ *
*/
public synchronized void destroy()
{
- if ( destroyed )
- {
- return;
- }
- destroyed = true;
- // sychronize the q so the thread will not wait forever.
- synchronized ( q )
- {
- t.interrupt();
+ if ( ! destroyed )
+ {
+ destroyed = true;
+
+ // sychronize on queue so the thread will not wait forever,
+ // and then interrupt the QueueProcessor
+
+ synchronized ( queue )
+ {
+ t.interrupt();
+ }
+
+ t = null;
+
+ log.warn( "Cache event queue destroyed: " + this );
}
- t = null;
- log.warn( "*** Cache event queue destroyed " + this );
}
-
- //////////////////////////////////////////////////////
/**
- * Description of the Method
- *
- *@return Description of the Return Value
+ *
*/
public String toString()
{
return "listenerId=" + listenerId + ", cacheName=" + cacheName;
}
-
- ////////////////////////////////////////////////////
/**
- * Gets the alive attribute of the CacheEventQueue object
- *
- *@return The alive value
+ *
*/
public boolean isAlive()
{
- return !destroyed;
+ return ( ! destroyed );
}
-
- /////////////////////////////////////////////////////
/**
- * Gets the listenerId attribute of the CacheEventQueue object
- *
- *@return The listenerId value
+ *
*/
public byte getListenerId()
{
return listenerId;
}
-
-
- //////////////////////////////////////////////////////////////////////////////
+
/**
- * Adds a feature to the PutEvent attribute of the CacheEventQueue object
- *
- *@param ce The feature to be added to the PutEvent attribute
- *@exception IOException Description of the Exception
+ *
*/
public synchronized void addPutEvent( ICacheElement ce )
throws IOException
{
- if ( destroyed )
+ if ( ! destroyed )
{
- return;
- // Cache is dead.
- }
- if ( debug )
- {
- log.debug( "addPutEvent> key=" + ( String ) ce.getKey() );
- }
- q.put( new PutEvent( ce ) );
- if ( debug )
- {
- log.debug( "addPutEvent done> key=" + ( String ) ce.getKey() );
+ put( new PutEvent( ce ) );
}
}
-
/**
- *@param key The feature to be added to the RemoveEvent
- * attribute
- *@exception IOException Description of the Exception
+ *
*/
public void addRemoveEvent( Serializable key )
throws IOException
{
- if ( destroyed )
- {
- return;
- // Cache is dead.
+ if ( ! destroyed )
+ {
+ put( new RemoveEvent( key ) );
}
- q.put( new RemoveEvent( key ) );
}
-
/**
- *@exception IOException Description of the Exception
+ *
*/
public synchronized void addRemoveAllEvent()
throws IOException
{
- if ( destroyed )
- {
- return;
- // Cache is dead.
+ if ( ! destroyed )
+ {
+ put( new RemoveAllEvent() );
}
- q.put( new RemoveAllEvent() );
}
-
/**
- *@exception IOException Description of the Exception
+ *
*/
public synchronized void addDisposeEvent()
throws IOException
{
- if ( destroyed )
+ if ( ! destroyed )
+ {
+ put( new DisposeEvent() );
+ }
+ }
+
+ /**
+ * Adds an event to the queue.
+ */
+ private void put( AbstractCacheEvent event )
+ {
+ try
{
- return;
- // Cache is dead.
+ queue.put( event );
+ }
+ catch ( InterruptedException e )
+ {
+ // We should handle terminated gracefully here, however the
+ // LinkedQueue implementation of Channel shouldn't throw
+ // this since puts are non blocking. For now I will ignore
+ // it. [james@jamestaylor.org]
+
+ // Options:
+ // - destory self
+ // - destory and rethrow
}
- q.put( new DisposeEvent() );
}
@@ -236,17 +214,28 @@
*/
public void run()
{
+ Runnable r = null;
+
while ( !destroyed )
- {
- Runnable r = ( Runnable ) q.pull();
+ {
+ try
+ {
+ r = ( Runnable ) queue.take();
+ }
+ catch ( InterruptedException e )
+ {
+ // We were interrupted, so terminate gracefully.
+
+ this.destroy();
+ }
- if ( !destroyed )
+ if ( !destroyed && r != null )
{
r.run();
}
}
// declare failure as listener is permanently unreachable.
- q = null;
+ queue = null;
listener = null;
log.warn( "QProcessor exiting for " + CacheEventQueue.this );
}
@@ -300,8 +289,8 @@
// Too bad. The remote host is unreachable, so we give up.
if ( ex != null )
{
- log.warn( "Warning: *** give up propagation " + CacheEventQueue.this );
- log.logEx( ex );
+ log.warn( "Giving up propagation " + CacheEventQueue.this, ex );
+
destroy();
}
return;
@@ -373,15 +362,12 @@
protected void doRun()
throws IOException
{
- if ( debug )
- {
- log.debug( "doRun> key=" + ( String ) ice.getKey() );
- }
/*
* CacheElement ce = new CacheElement(cacheName, key, obj);
* ce.setAttributes( attr );
* ce.setGroupName( groupName );
*/
+
listener.handlePut( ice );
}
}
1.4 +36 -39 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/log/LogEventQueue.java
Index: LogEventQueue.java
===================================================================
RCS file: /home/cvs/jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/utils/log/LogEventQueue.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- LogEventQueue.java 15 Jan 2002 21:33:36 -0000 1.3
+++ LogEventQueue.java 14 Feb 2002 20:42:44 -0000 1.4
@@ -3,36 +3,27 @@
import org.apache.stratum.jcs.utils.log.ILogEventQueue;
import org.apache.stratum.jcs.utils.log.ILogListener;
-import org.apache.stratum.jcs.utils.reuse.ITomcatQueue;
-import org.apache.stratum.jcs.utils.reuse.TomcatJglQueue;
+import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
/**
- * The log event queue where all logging requests are queued and processed in
- * FIFO order.
+ * The log event queue where all logging requests are queued and processed in
+ * FIFO order.
*
- *@author asmuts
- *@created January 15, 2002
- */
-// Note: don't use Logger in this class. Else may lead to endless recursive loop.
+ * Note: don't use Logger in this class. Else may lead to endless recursive
+ * loop.
+ */
class LogEventQueue implements ILogEventQueue
{
private static boolean debug = false;
- private ITomcatQueue q = new TomcatJglQueue();
private ILogListener listener;
+
+ private LinkedQueue queue = new LinkedQueue();
private boolean destroyed;
private Thread t;
- // TODO: parameterize the max q size.
- /**
- * Description of the Field
- */
- public static int MAX_Q_SIZE = 1024 * 1024;
-
/**
- * Constructs with the specified listener and the cache name.
- *
- *@param listener Description of the Parameter
+ * Constructs with the specified listener and the cache name.
*/
LogEventQueue( ILogListener listener )
{
@@ -63,7 +54,7 @@
}
destroyed = true;
// sychronize the q so the thread will not wait forever.
- synchronized ( q )
+ synchronized ( queue )
{
t.interrupt();
}
@@ -98,14 +89,16 @@
if ( debug )
{
p( "addClearLogEvent>" );
+ }
+
+ try
+ {
+ queue.put( new ClearLogEvent( logname ) );
}
- if ( q.size() > MAX_Q_SIZE )
+ catch ( InterruptedException e )
{
- q.get();
- // drop the front item.
- p( "Log q size exceeded. Dropping front item." );
+ // Ignored, will return. FIXME: Should rethrow?
}
- q.put( new ClearLogEvent( logname ) );
if ( debug )
{
@@ -129,14 +122,16 @@
{
p( "addCycleLogEvent>" );
}
- if ( q.size() > MAX_Q_SIZE )
+
+ try
{
- q.get();
- // drop the front item.
- p( "Log q size exceeded. Dropping front item." );
+ queue.put( new CycleLogEvent( logname ) );
}
- q.put( new CycleLogEvent( logname ) );
-
+ catch ( InterruptedException e )
+ {
+ // Ignored, will return. FIXME: Should rethrow?
+ }
+
if ( debug )
{
p( "addCycleLogEvent done>" );
@@ -160,20 +155,22 @@
{
p( "addDumpEvent> " + sb );
}
- if ( q.size() > MAX_Q_SIZE )
+
+ try
{
- q.get();
- // drop the front item.
- p( "Log q size exceeded. Dropping front item." );
+ queue.put( new DumpEvent( logname, sb ) );
}
- q.put( new DumpEvent( logname, sb ) );
-
+ catch ( InterruptedException e )
+ {
+ // Ignored, will return. FIXME: Should rethrow?
+ }
+
if ( debug )
{
p( "addDumpEvent done>" );
}
}
-
+
///////////////////////////// Inner classes /////////////////////////////
/**
@@ -203,7 +200,7 @@
{
try
{
- Runnable r = ( Runnable ) q.pull();
+ Runnable r = ( Runnable ) queue.take();
if ( !destroyed )
{
@@ -215,7 +212,7 @@
ex.printStackTrace( System.err );
}
}
- q = null;
+ queue = null;
listener = null;
p( "QProcessor exiting for " + LogEventQueue.this );
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>