You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@cocoon.apache.org by gi...@apache.org on 2004/11/05 23:47:22 UTC

svn commit: rev 56701 - in cocoon/trunk/src: java/org/apache/cocoon java/org/apache/cocoon/components/flow java/org/apache/cocoon/components/thread webapp/WEB-INF

Author: giacomo
Date: Fri Nov  5 14:47:21 2004
New Revision: 56701

Modified:
   cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles
   cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java
   cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java
   cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf
Log:
- made DefaultRunnableManager be able to start and stop
- added DefaultRunnableManager to roles and cocoon.xconf
- made ContinuationsManagerImpl use the RunnableManager for expire Continuation checking


Modified: cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles
==============================================================================
--- cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles	(original)
+++ cocoon/trunk/src/java/org/apache/cocoon/cocoon.roles	Fri Nov  5 14:47:21 2004
@@ -218,5 +218,10 @@
   <role name="org.apache.cocoon.components.persistence.RequestDataStore"
         shorthand="request-data-store"
         default-class="org.apache.cocoon.components.persistence.RequestDataStoreImpl"/>
+  
+  <!-- Running commands (Runnable) in background -->
+  <role name="org.apache.cocoon.components.thread.RunnableManager"
+        shorthand="runnable-manager"
+        default-class="org.apache.cocoon.components.thread.DefaultRunnableManager"/>
 
 </role-list>

Modified: cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java
==============================================================================
--- cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java	(original)
+++ cocoon/trunk/src/java/org/apache/cocoon/components/flow/ContinuationsManagerImpl.java	Fri Nov  5 14:47:21 2004
@@ -30,7 +30,11 @@
 import org.apache.avalon.framework.configuration.Configurable;
 import org.apache.avalon.framework.configuration.Configuration;
 import org.apache.avalon.framework.logger.AbstractLogEnabled;
+import org.apache.avalon.framework.service.ServiceException;
+import org.apache.avalon.framework.service.ServiceManager;
+import org.apache.avalon.framework.service.Serviceable;
 import org.apache.avalon.framework.thread.ThreadSafe;
+import org.apache.cocoon.components.thread.RunnableManager;
 
 /**
  * The default implementation of {@link ContinuationsManager}.
@@ -43,7 +47,7 @@
  */
 public class ContinuationsManagerImpl
         extends AbstractLogEnabled
-        implements ContinuationsManager, Configurable, Disposable, ThreadSafe {
+        implements ContinuationsManager, Configurable, Disposable, ThreadSafe, Serviceable {
 
     static final int CONTINUATION_ID_LENGTH = 20;
     static final String EXPIRE_CONTINUATIONS = "expire-continuations";
@@ -54,8 +58,6 @@
     protected SecureRandom random;
     protected byte[] bytes;
 
-    protected ContinuationInterrupt interrupt;
-
     /**
      * How long does a continuation exist in memory since the last
      * access? The time is in miliseconds, and the default is 1 hour.
@@ -84,6 +86,7 @@
 
     private String instrumentableName;
 
+    private ServiceManager serviceManager;
 
     public ContinuationsManagerImpl() throws Exception {
         try {
@@ -103,13 +106,17 @@
         this.defaultTimeToLive = config.getAttributeAsInteger("time-to-live", (3600 * 1000));
 
         final Configuration expireConf = config.getChild("expirations-check");
+        final long initialDelay = expireConf.getChild("offset", true).getValueAsLong(180000);
+        final long interval = expireConf.getChild("period", true).getValueAsLong(180000);
         try {
-            this.interrupt = new ContinuationInterrupt(expireConf);
-            Thread thread = new Thread(interrupt);
-            thread.setDaemon(true);
-            thread.setName("continuation-interrupt");
-            thread.start();
-            Thread.yield();
+            final RunnableManager runnableManager = (RunnableManager)serviceManager.lookup(RunnableManager.ROLE);
+            runnableManager.execute( new Runnable() {
+                    public void run()
+                    {
+                        expireContinuations();
+                    }
+                }, initialDelay, interval);
+            serviceManager.release(runnableManager);
         } catch (Exception e) {
             getLogger().warn("Could not enqueue continuations expiration task. " +
                              "Continuations will not automatically expire.", e);
@@ -117,13 +124,22 @@
     }
 
     /* (non-Javadoc)
+     * @see org.apache.avalon.framework.service.Serviceable#service()
+     */
+    public void service( ServiceManager manager )
+    throws ServiceException
+    {
+        this.serviceManager = manager;
+    }
+
+    /* (non-Javadoc)
      * @see org.apache.avalon.framework.activity.Disposable#dispose()
      */
     public void dispose() {
         // stop the thread
-        if ( this.interrupt != null ) {
+        /*if ( this.interrupt != null ) {
             this.interrupt.doRun = false;
-        }
+        }*/
     }
     
     public WebContinuation createWebContinuation(Object kont,
@@ -321,7 +337,7 @@
     /**
      * Remove all continuations which have already expired.
      */
-    private void expireContinuations() {
+    protected void expireContinuations() {
         long now = 0;
         if (getLogger().isDebugEnabled()) {
             now = System.currentTimeMillis();
@@ -352,47 +368,6 @@
             displayAllContinuations();
             displayExpireSet();
             */
-        }
-    }
-
-
-    final class ContinuationInterrupt implements Runnable {
-        private final long interval;
-        private final long initialDelay;
-
-        public boolean doRun;
-        
-        /**
-         * @param expireConf
-         */
-        public ContinuationInterrupt(Configuration expireConf) {
-            // only periodic time triggers are supported
-            this.initialDelay = expireConf.getChild("offset", true).getValueAsLong(100);
-            this.interval = expireConf.getChild("period", true).getValueAsLong(100);
-        }
-
-        /**
-         * expire any continuations that need expiring.
-         */
-        public void run() {
-            this.doRun = true;
-            if ( this.initialDelay > 0 ) {
-                // Sleep
-                try {
-                    Thread.sleep(this.initialDelay);
-                } catch (InterruptedException ignore) { 
-                    // ignore
-                }
-            }
-            while (doRun) {
-                expireContinuations();
-                // Sleep
-                try {
-                    Thread.sleep(this.interval);
-                } catch (InterruptedException ignore) { 
-                    // ignore
-                }
-            }
         }
     }
 }

Modified: cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java
==============================================================================
--- cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java	(original)
+++ cocoon/trunk/src/java/org/apache/cocoon/components/thread/DefaultRunnableManager.java	Fri Nov  5 14:47:21 2004
@@ -16,11 +16,14 @@
 package org.apache.cocoon.components.thread;
 
 import org.apache.avalon.framework.activity.Disposable;
+import org.apache.avalon.framework.activity.Initializable;
+import org.apache.avalon.framework.activity.Startable;
 import org.apache.avalon.framework.configuration.Configurable;
 import org.apache.avalon.framework.configuration.Configuration;
 import org.apache.avalon.framework.configuration.ConfigurationException;
 import org.apache.avalon.framework.logger.AbstractLogEnabled;
 import org.apache.avalon.framework.logger.Logger;
+import org.apache.avalon.framework.thread.ThreadSafe;
 
 import java.util.HashMap;
 import java.util.Iterator;
@@ -59,7 +62,8 @@
  */
 public class DefaultRunnableManager
     extends AbstractLogEnabled
-    implements RunnableManager, Configurable, Disposable, Runnable
+    implements RunnableManager, Configurable, Disposable, Runnable, Startable,
+                   ThreadSafe
 {
     //~ Static fields/initializers ---------------------------------------------
 
@@ -80,7 +84,7 @@
     public static final String DEFAULT_THREAD_PRIORITY = "NORM";
 
     /** The default keep alive time */
-    public static final long DEFAULT_KEEP_ALIVE_TIME = 20000L;
+    public static final long DEFAULT_KEEP_ALIVE_TIME = 60000L;
 
     /** The default way to shutdown gracefully */
     public static final boolean DEFAULT_SHUTDOWN_GRACEFUL = false;
@@ -93,17 +97,20 @@
 
     //~ Instance fields --------------------------------------------------------
 
+    /**
+     * Sorted set of <code>ExecutionInfo</code> instances, based on their next
+     * execution time.
+     */
+    protected SortedSet m_executionInfo = new TreeSet(  );
+
     /** The managed thread pools */
     final Map m_pools = new HashMap(  );
 
     /** The configured default ThreadFactory class instance */
     private Class m_defaultThreadFactoryClass;
 
-    /**
-     * Sorted set of <code>ExecutionInfo</code> instances, based on their next
-     * execution time.
-     */
-    protected SortedSet m_executionInfo = new TreeSet(  );
+    /** Keep us running? */
+    private boolean m_keepRunning = false;
 
     //~ Methods ----------------------------------------------------------------
 
@@ -255,13 +262,40 @@
      */
     public void dispose(  )
     {
+        if( getLogger(  ).isDebugEnabled(  ) )
+        {
+            getLogger(  ).debug( "Disposing all thread pools" );
+        }
+
         for( final Iterator i = m_pools.keySet(  ).iterator(  ); i.hasNext(  ); )
         {
-            final DefaultThreadPool pool = (DefaultThreadPool)i.next(  );
+            final String poolName = (String)i.next(  );
+            final DefaultThreadPool pool =
+                (DefaultThreadPool)m_pools.get( poolName );
+
+            if( getLogger(  ).isDebugEnabled(  ) )
+            {
+                getLogger(  ).debug( "Disposing thread pool " +
+                                     pool.getName(  ) );
+            }
+
             pool.shutdown(  );
+
+            if( getLogger(  ).isDebugEnabled(  ) )
+            {
+                getLogger(  ).debug( "Thread pool " + pool.getName(  ) +
+                                     " disposed" );
+            }
         }
 
-        m_pools.clear(  );
+        try
+        {
+            m_pools.clear(  );
+        }
+        catch( final Throwable t )
+        {
+            getLogger(  ).error( "Cannot dispose", t );
+        }
     }
 
     /**
@@ -271,6 +305,8 @@
      * @param command The {@link Runnable} to execute
      * @param delay the delay befor first run
      * @param interval The interval for repeated runs
+     *
+     * @throws IllegalArgumentException DOCUMENT ME!
      */
     public void execute( final String threadPoolName,
                          final Runnable command,
@@ -281,10 +317,12 @@
         {
             throw new IllegalArgumentException( "delay < 0" );
         }
+
         if( interval < 0 )
         {
             throw new IllegalArgumentException( "interval < 0" );
         }
+
         ThreadPool pool = (ThreadPool)m_pools.get( threadPoolName );
 
         if( null == pool )
@@ -295,6 +333,13 @@
             pool = (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME );
         }
 
+        if( getLogger(  ).isDebugEnabled(  ) )
+        {
+            getLogger(  ).debug( "Command entered: " + command.toString(  ) +
+                                 ",pool=" + pool.getName(  ) + ",delay=" +
+                                 delay + ",interval=" + interval );
+        }
+
         new ExecutionInfo( pool, command, delay, interval, getLogger(  ) );
     }
 
@@ -365,7 +410,12 @@
      */
     public void run(  )
     {
-        while( true )
+        if( getLogger(  ).isDebugEnabled(  ) )
+        {
+            getLogger(  ).debug( "Entering loop" );
+        }
+
+        while( m_keepRunning )
         {
             synchronized( m_executionInfo )
             {
@@ -385,31 +435,74 @@
                     }
                     else
                     {
-                        if(getLogger().isDebugEnabled() )
+                        if( getLogger(  ).isDebugEnabled(  ) )
                         {
-                            getLogger().debug( "No commands available. Will just wait for one" );
+                            getLogger(  ).debug( "No commands available. Will just wait for one" );
                         }
+
                         m_executionInfo.wait(  );
                     }
                 }
                 catch( final InterruptedException ie )
                 {
-                    if(getLogger().isDebugEnabled() )
+                    if( getLogger(  ).isDebugEnabled(  ) )
                     {
-                        getLogger().debug( "I've been interrupted" );
+                        getLogger(  ).debug( "I've been interrupted" );
                     }
                 }
 
-                final ExecutionInfo info = (ExecutionInfo)m_executionInfo.first(  );
-                final long delay =
-                    info.m_nextRun - System.currentTimeMillis(  );
-
-                if( delay < 0 )
+                if( m_keepRunning )
                 {
-                    info.execute(  );
+                    final ExecutionInfo info =
+                        (ExecutionInfo)m_executionInfo.first(  );
+                    final long delay =
+                        info.m_nextRun - System.currentTimeMillis(  );
+
+                    if( delay < 0 )
+                    {
+                        info.execute(  );
+                    }
                 }
             }
         }
+
+        if( getLogger(  ).isDebugEnabled(  ) )
+        {
+            getLogger(  ).debug( "Exiting loop" );
+        }
+    }
+
+    /**
+     * Start the managing thread
+     *
+     * @throws Exception DOCUMENT ME!
+     */
+    public void start(  )
+        throws Exception
+    {
+        if( getLogger(  ).isDebugEnabled(  ) )
+        {
+            getLogger(  ).debug( "starting heart" );
+        }
+
+        m_keepRunning = true;
+        ( (ThreadPool)m_pools.get( DEFAULT_THREADPOOL_NAME ) ).execute( this );
+    }
+
+    /**
+     * Stop the managing thread
+     *
+     * @throws Exception DOCUMENT ME!
+     */
+    public void stop(  )
+        throws Exception
+    {
+        m_keepRunning = false;
+
+        synchronized( m_executionInfo )
+        {
+            m_executionInfo.notifyAll(  );
+        }
     }
 
     /**
@@ -636,12 +729,19 @@
          */
         void execute(  )
         {
+            if( getLogger(  ).isDebugEnabled(  ) )
+            {
+                getLogger(  ).debug( "Executing Command: " +
+                                     m_command.toString(  ) + ",pool=" +
+                                     m_pool.getName(  ) + ",delay=" + m_delay +
+                                     ",interval=" + m_interval );
+            }
+
             synchronized( m_executionInfo )
             {
                 m_executionInfo.remove( this );
                 m_nextRun = ( ( m_interval > 0 )
-                        ? ( System.currentTimeMillis(  ) + m_interval ) : 0 );
-
+                              ? ( System.currentTimeMillis(  ) + m_interval ) : 0 );
 
                 if( m_nextRun > 0 )
                 {

Modified: cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf
==============================================================================
--- cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf	(original)
+++ cocoon/trunk/src/webapp/WEB-INF/cocoon.xconf	Fri Nov  5 14:47:21 2004
@@ -548,5 +548,93 @@
     <parameter name="preemptive-loader-url" 
                value="http://localhost:8080/cocoon/samples/cinclude/loader"/>
     -->
- </component>
+  </component>
+ 
+  <!--+
+      | Runnable manager
+      |
+      | this component manages commands (Runnables) executed in background using
+      | preconfigured pools of worker threads
+      +-->
+  <runnable-manager logger="core.runnable">
+    <!--+
+        | This is the default configuration of the runnable-manager. More 
+        | indepth information can be found at
+        | http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/PooledExecutor.html
+        | The following elements can be used:
+        |
+        | thread-factory:        specifies the fully qualified class name of an
+        |                        org.apache.cocoon.components.thread.ThreadFactory
+        |                        implementation. It is responsible to create Thread 
+        |                        classes.
+        | thread-pools:          container element for thread-pool elements.
+        | name:                  required name of the pool.
+        | priority:              optional priority all threads of the pool will 
+        |                        have (the ThreadFactory will be set to this 
+        |                        priority).The possible values  are: 
+        |                          MIN:  corresponds to Thread#MIN_PRIORITY
+        |                          NORM: corresponds to Thread#NORM_PRIORITY (default)
+        |                          MAX:  corresponds to Thread#MAX_PRIORITY
+        | queue-size:            optional size of a queue to hold Runnables if the 
+        |                        pool is full. Possible values are:
+        |                          less than 0:    unbounded (default)
+        |                          equal to 0:     no queue at all
+        |                          greater than 0: size of the queue
+        | max-pool-size:         optional maximum number of threads in the pool. 
+        |                        Defaults to 5. 
+        |                        NOTE: if a queue is specified (queue-sie != 0) 
+        |                              this value will be ignored.
+        | min-pool-size:         optional minimum number of threads in the pool. 
+        |                        Defaults to 5. 
+        |                        NOTE: if a queue has been specified (queue-sie != 0)
+        |                              this value will be used as the maximum of
+        |                              thread running concurrently.
+        | keep-alive-time-ms:    The time in ms an idle thread should keep alive 
+        |                        before it might get garbage collected. This 
+        |                        defaults to 60000 ms.
+        | block-policy;          The policy to be used if all resources (thread in
+        |                        the pool and slots in the queue) are exhausted.
+        |                        Possible values are:
+        |                          ABORT:         Throw a RuntimeException
+        |                          DISCARD:       Throw away the current request 
+        |                                         and return.
+        |                          DISCARDOLDEST: Throw away the oldest request 
+        |                                         and return.
+        |                          RUN (default): The thread making the execute 
+        |                                         request runs the task itself. 
+        |                                         This policy helps guard against 
+        |                                         lockup.
+        |                          WAIT:          Wait until a thread becomes 
+        |                                         available. This policy should, in 
+        |                                         general, not be used if the 
+        |                                         minimum number of threads is zero, 
+        |                                         in which case a thread may never 
+        |                                         become available.
+        | shutdown-graceful:     Terminate thread pool after processing all 
+        |                        Runnables currently in queue. Any Runnable entered 
+        |                        after this point will be discarded. A shut down 
+        |                        pool cannot be restarted. This also means that a 
+        |                        pool will need keep-alive-time-ms to terminate. 
+        |                        The default value not to shutdown graceful.
+        | shutdown-wait-time-ms: The time in ms to wait before issuing an 
+        |                        immediate shutdown after a graceful shutdown 
+        |                        has been requested.
+        +-->
+    <!--
+    <thread-factory>org.apache.cocoon.components.thread.DefaultThreadFactory</thread-factory>
+    <thread-pools>
+      <thread-pool>  
+        <name>default</name>
+        <priority>NORM</priority>
+        <queue-size>-1</queue-size>
+        <max-pool-size>5</max-pool-size>
+        <min-pool-size>5</min-pool-size>
+        <keep-alive-time-ms>60000</keep-alive-time-ms>
+        <block-policy>RUN</block-policy>
+        <shutdown-graceful>false</shutdown-graceful>
+        <shutdown-wait-time-ms>-1</shutdown-wait-time-ms>
+      </thread-pool> 
+    </thread-pools>
+    -->
+  </runnable-manager>
 </cocoon>