You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2004/04/14 06:36:16 UTC

cvs commit: james-server/src/java/org/apache/james/transport JamesSpoolManager.java

noel        2004/04/13 21:36:16

  Modified:    src/java/org/apache/james/transport Tag: branch_2_1_fcs
                        JamesSpoolManager.java
  Log:
  Fix JAMES-9.
  
  JamesSpoolManager.dispose() now tells the spooler threads to terminate
  and then waits (up to 1 minute) for them to complete.
  
  Commented out the use of ThreadPool.  It was never actually used as a
  thread pool.  All of the threads were created at initialization, and
  remained dedicated for the duration of the application.  The way a
  thread pool is normally used is for a series of short-lived runners.
  Future revisions to the spooler could use a thread pool by refactoring
  the run() method, and having a small number of threads that get mail
  from the spool, and dispatch them to workers in a thread pool for
  processing.
  
  Revision  Changes    Path
  No                   revision
  No                   revision
  1.20.4.15 +63 -11    james-server/src/java/org/apache/james/transport/JamesSpoolManager.java
  
  Index: JamesSpoolManager.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/JamesSpoolManager.java,v
  retrieving revision 1.20.4.14
  retrieving revision 1.20.4.15
  diff -u -r1.20.4.14 -r1.20.4.15
  --- JamesSpoolManager.java	13 Apr 2004 19:53:34 -0000	1.20.4.14
  +++ JamesSpoolManager.java	14 Apr 2004 04:36:15 -0000	1.20.4.15
  @@ -18,7 +18,7 @@
   package org.apache.james.transport;
   
   import org.apache.avalon.cornerstone.services.threads.ThreadManager;
  -import org.apache.avalon.excalibur.thread.ThreadPool;
  +//import org.apache.avalon.excalibur.thread.ThreadPool;
   import org.apache.avalon.framework.activity.Disposable;
   import org.apache.avalon.framework.activity.Initializable;
   import org.apache.avalon.framework.component.ComponentException;
  @@ -38,6 +38,8 @@
   import org.apache.mailet.*;
   
   import javax.mail.MessagingException;
  +
  +import java.util.Collection;
   import java.util.HashMap;
   import java.util.Iterator;
   
  @@ -85,21 +87,43 @@
       private int numThreads;
   
       /**
  -     * The ThreadPool containing the spool threads.
  +     * The ThreadPool containing worker threads.
  +     *
  +     * This used to be used, but for threads that lived the entire
  +     * lifespan of the application.  Currently commented out.  In
  +     * the future, we could use a thread pool to run short-lived
  +     * workers, so that we have a smaller number of readers that
  +     * accept a message from the spool, and dispatch to a pool of
  +     * worker threads that process the message.
        */
  -    private ThreadPool workerPool;
  +    // private ThreadPool workerPool;
   
       /**
        * The ThreadManager from which the thread pool is obtained.
        */
  -    private ThreadManager threadManager;
  +    // private ThreadManager threadManager;
  +
  +    /**
  +     * Number of active threads
  +     */
  +    private int numActive;
  +
  +    /**
  +     * Spool threads are active
  +     */
  +    private boolean active;
  +
  +    /**
  +     * Spool threads
  +     */
  +    private Collection spoolThreads;
   
       /**
        * @see org.apache.avalon.framework.component.Composable#compose(ComponentManager)
        */
       public void compose(ComponentManager comp)
           throws ComponentException {
  -        threadManager = (ThreadManager)comp.lookup( ThreadManager.ROLE );
  +        // threadManager = (ThreadManager)comp.lookup( ThreadManager.ROLE );
           compMgr = new DefaultComponentManager(comp);
       }
   
  @@ -117,7 +141,7 @@
       public void initialize() throws Exception {
   
           getLogger().info("JamesSpoolManager init...");
  -        workerPool = threadManager.getThreadPool( "default" );
  +        // workerPool = threadManager.getThreadPool( "default" );
           MailStore mailstore
               = (MailStore) compMgr.lookup("org.apache.james.services.MailStore");
           spool = mailstore.getInboundSpool();
  @@ -295,8 +319,15 @@
                       .append(" Thread(s)");
               getLogger().info(infoBuffer.toString());
           }
  -        for ( int i = 0 ; i < numThreads ; i++ )
  -            workerPool.execute(this);
  +
  +        active = true;
  +        numActive = 0;
  +        spoolThreads = new java.util.ArrayList(numThreads);
  +        for ( int i = 0 ; i < numThreads ; i++ ) {
  +            Thread reader = new Thread(this, "Spool Thread #" + i);
  +            spoolThreads.add(reader);
  +            reader.start();
  +        }
       }
   
       /**
  @@ -312,7 +343,8 @@
               getLogger().info("Spool=" + spool.getClass().getName());
           }
   
  -        while(true) {
  +        numActive++;
  +        while(active) {
               String key = null;
               try {
                   MailImpl mail = (MailImpl)spool.accept();
  @@ -348,6 +380,8 @@
                       spool.unlock(key);
                   }
                   mail = null;
  +            } catch (InterruptedException ie) {
  +                getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
               } catch (Throwable e) {
                   if (getLogger().isErrorEnabled()) {
                       getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
  @@ -369,7 +403,11 @@
                   */
               }
           }
  -        
  +        if (getLogger().isInfoEnabled())
  +        {
  +            getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
  +        }
  +        numActive--;
       }
   
       /**
  @@ -470,6 +508,20 @@
        */
       public void dispose() {
           getLogger().info("JamesSpoolManager dispose...");
  +        active = false; // shutdown the threads
  +        for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
  +            ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
  +        }
  +
  +        long stop = System.currentTimeMillis() + 60000;
  +        // give the spooler threads one minute to terminate gracefully
  +        while (numActive != 0 && stop > System.currentTimeMillis()) {
  +            try {
  +                Thread.sleep(1000);
  +            } catch (Exception ignored) {}
  +        }
  +        getLogger().info("JamesSpoolManager thread shutdown completed.");
  +
           Iterator it = processors.keySet().iterator();
           while (it.hasNext()) {
               String processorName = (String)it.next();
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org