You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2004/04/14 06:36:16 UTC
cvs commit: james-server/src/java/org/apache/james/transport JamesSpoolManager.java
noel 2004/04/13 21:36:16
Modified: src/java/org/apache/james/transport Tag: branch_2_1_fcs
JamesSpoolManager.java
Log:
Fix JAMES-9.
JamesSpoolManager.dispose() now tells the spooler threads to terminate
and then waits (up to 1 minute) for them to complete.
Commented out the use of ThreadPool. It was never actually used as a
thread pool. All of the threads were created at initialization, and
remained dedicated for the duration of the application. The way a
thread pool is normally used is for a series of short-lived runners.
Future revisions to the spooler could use a thread pool by refactoring
the run() method, and having a small number of threads that get mail
from the spool, and dispatch them to workers in a thread pool for
processing.
Revision Changes Path
No revision
No revision
1.20.4.15 +63 -11 james-server/src/java/org/apache/james/transport/JamesSpoolManager.java
Index: JamesSpoolManager.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/JamesSpoolManager.java,v
retrieving revision 1.20.4.14
retrieving revision 1.20.4.15
diff -u -r1.20.4.14 -r1.20.4.15
--- JamesSpoolManager.java 13 Apr 2004 19:53:34 -0000 1.20.4.14
+++ JamesSpoolManager.java 14 Apr 2004 04:36:15 -0000 1.20.4.15
@@ -18,7 +18,7 @@
package org.apache.james.transport;
import org.apache.avalon.cornerstone.services.threads.ThreadManager;
-import org.apache.avalon.excalibur.thread.ThreadPool;
+//import org.apache.avalon.excalibur.thread.ThreadPool;
import org.apache.avalon.framework.activity.Disposable;
import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.component.ComponentException;
@@ -38,6 +38,8 @@
import org.apache.mailet.*;
import javax.mail.MessagingException;
+
+import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
@@ -85,21 +87,43 @@
private int numThreads;
/**
- * The ThreadPool containing the spool threads.
+ * The ThreadPool containing worker threads.
+ *
+ * This used to be used, but for threads that lived the entire
+ * lifespan of the application. Currently commented out. In
+ * the future, we could use a thread pool to run short-lived
+ * workers, so that we have a smaller number of readers that
+ * accept a message from the spool, and dispatch to a pool of
+ * worker threads that process the message.
*/
- private ThreadPool workerPool;
+ // private ThreadPool workerPool;
/**
* The ThreadManager from which the thread pool is obtained.
*/
- private ThreadManager threadManager;
+ // private ThreadManager threadManager;
+
+ /**
+ * Number of active threads
+ */
+ private int numActive;
+
+ /**
+ * Spool threads are active
+ */
+ private boolean active;
+
+ /**
+ * Spool threads
+ */
+ private Collection spoolThreads;
/**
* @see org.apache.avalon.framework.component.Composable#compose(ComponentManager)
*/
public void compose(ComponentManager comp)
throws ComponentException {
- threadManager = (ThreadManager)comp.lookup( ThreadManager.ROLE );
+ // threadManager = (ThreadManager)comp.lookup( ThreadManager.ROLE );
compMgr = new DefaultComponentManager(comp);
}
@@ -117,7 +141,7 @@
public void initialize() throws Exception {
getLogger().info("JamesSpoolManager init...");
- workerPool = threadManager.getThreadPool( "default" );
+ // workerPool = threadManager.getThreadPool( "default" );
MailStore mailstore
= (MailStore) compMgr.lookup("org.apache.james.services.MailStore");
spool = mailstore.getInboundSpool();
@@ -295,8 +319,15 @@
.append(" Thread(s)");
getLogger().info(infoBuffer.toString());
}
- for ( int i = 0 ; i < numThreads ; i++ )
- workerPool.execute(this);
+
+ active = true;
+ numActive = 0;
+ spoolThreads = new java.util.ArrayList(numThreads);
+ for ( int i = 0 ; i < numThreads ; i++ ) {
+ Thread reader = new Thread(this, "Spool Thread #" + i);
+ spoolThreads.add(reader);
+ reader.start();
+ }
}
/**
@@ -312,7 +343,8 @@
getLogger().info("Spool=" + spool.getClass().getName());
}
- while(true) {
+ numActive++;
+ while(active) {
String key = null;
try {
MailImpl mail = (MailImpl)spool.accept();
@@ -348,6 +380,8 @@
spool.unlock(key);
}
mail = null;
+ } catch (InterruptedException ie) {
+ getLogger().info("Interrupted JamesSpoolManager: " + Thread.currentThread().getName());
} catch (Throwable e) {
if (getLogger().isErrorEnabled()) {
getLogger().error("Exception processing " + key + " in JamesSpoolManager.run "
@@ -369,7 +403,11 @@
*/
}
}
-
+ if (getLogger().isInfoEnabled())
+ {
+ getLogger().info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
+ }
+ numActive--;
}
/**
@@ -470,6 +508,20 @@
*/
public void dispose() {
getLogger().info("JamesSpoolManager dispose...");
+ active = false; // shutdown the threads
+ for (Iterator it = spoolThreads.iterator(); it.hasNext(); ) {
+ ((Thread) it.next()).interrupt(); // interrupt any waiting accept() calls.
+ }
+
+ long stop = System.currentTimeMillis() + 60000;
+ // give the spooler threads one minute to terminate gracefully
+ while (numActive != 0 && stop > System.currentTimeMillis()) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception ignored) {}
+ }
+ getLogger().info("JamesSpoolManager thread shutdown completed.");
+
Iterator it = processors.keySet().iterator();
while (it.hasNext()) {
String processorName = (String)it.next();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org