You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2003/11/16 22:47:24 UTC
cvs commit: james-server/src/xdocs provided_mailets_2_1.xml
noel 2003/11/16 13:47:24
Modified: src/conf Tag: branch_2_1_fcs james-config.xml
sqlResources.xml
src/java/org/apache/james/mailrepository Tag: branch_2_1_fcs
AvalonSpoolRepository.java JDBCSpoolRepository.java
src/java/org/apache/james/services Tag: branch_2_1_fcs
SpoolRepository.java
src/java/org/apache/james/transport Tag: branch_2_1_fcs
JamesSpoolManager.java
src/java/org/apache/james/transport/mailets Tag:
branch_2_1_fcs RemoteDelivery.java
src/xdocs Tag: branch_2_1_fcs provided_mailets_2_1.xml
Log:
Commit Soren Hilmer's change to support a flexible retry schedule for RemoteDelivery.
This commit also includes a concurrent change to SpoolRepository so that accept() is a one-step process that returns the Mail, rather than an unsynchronized two-step process. Another related change is that accept() takes a filter object to control the next message to be returned, rather than hardcoding the algorithm.
Revision Changes Path
No revision
No revision
1.40.2.20 +12 -6 james-server/src/conf/james-config.xml
Index: james-config.xml
===================================================================
RCS file: /home/cvs/james-server/src/conf/james-config.xml,v
retrieving revision 1.40.2.19
retrieving revision 1.40.2.20
diff -u -r1.40.2.19 -r1.40.2.20
--- james-config.xml 20 Oct 2003 03:14:15 -0000 1.40.2.19
+++ james-config.xml 16 Nov 2003 21:47:24 -0000 1.40.2.20
@@ -299,11 +299,17 @@
<outgoing> db://maildb/spool/outgoing </outgoing>
-->
- <!-- Number of milliseconds between delivery attempts -->
- <delayTime> 21600000 </delayTime>
-
- <!-- Number of failed attempts before returning to the sender -->
- <maxRetries> 5 </maxRetries>
+ <!-- Delivery Schedule based upon RFC 2821, 4.5.4.1 -->
+ <!-- 5 day retry period, with 4 attempts in the first
+ hour, two more within the first 6 hours, and then
+ every 6 hours for the rest of the period. -->
+ <delayTime> 5 minutes </delayTime>
+ <delayTime> 10 minutes </delayTime>
+ <delayTime> 45 minutes </delayTime>
+ <delayTime> 2 hours </delayTime>
+ <delayTime> 3 hours </delayTime>
+ <delayTime> 6 hours </delayTime>
+ <maxRetries> 25 </maxRetries>
<!-- The number of threads that should be trying to deliver outgoing messages -->
<deliveryThreads> 1 </deliveryThreads>
1.16.4.8 +1 -1 james-server/src/conf/sqlResources.xml
Index: sqlResources.xml
===================================================================
RCS file: /home/cvs/james-server/src/conf/sqlResources.xml,v
retrieving revision 1.16.4.7
retrieving revision 1.16.4.8
diff -u -r1.16.4.7 -r1.16.4.8
--- sqlResources.xml 18 Aug 2003 15:42:36 -0000 1.16.4.7
+++ sqlResources.xml 16 Nov 2003 21:47:24 -0000 1.16.4.8
@@ -346,7 +346,7 @@
<sql name="removeMessageSQL">DELETE FROM ${table} WHERE message_name = ? AND repository_name = ?</sql>
<!-- Statements used to list all messages stored in this repository. -->
- <sql name="listMessagesSQL">SELECT message_name, message_state, last_updated FROM ${table} WHERE repository_name = ? ORDER BY last_updated ASC</sql>
+ <sql name="listMessagesSQL">SELECT message_name, message_state, last_updated, error_message FROM ${table} WHERE repository_name = ? ORDER BY last_updated ASC</sql>
<!-- Statements used to create the table associated with this class. -->
<sql name="createTable" db="hypersonic">
No revision
No revision
1.7.4.6 +84 -35 james-server/src/java/org/apache/james/mailrepository/AvalonSpoolRepository.java
Index: AvalonSpoolRepository.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/mailrepository/AvalonSpoolRepository.java,v
retrieving revision 1.7.4.5
retrieving revision 1.7.4.6
diff -u -r1.7.4.5 -r1.7.4.6
--- AvalonSpoolRepository.java 28 Aug 2003 16:22:37 -0000 1.7.4.5
+++ AvalonSpoolRepository.java 16 Nov 2003 21:47:24 -0000 1.7.4.6
@@ -81,15 +81,15 @@
implements SpoolRepository {
/**
- * <p>Returns the key for an arbitrarily selected mail deposited in this Repository.
+ * <p>Returns an arbitrarily selected mail deposited in this Repository.
* Usage: SpoolManager calls accept() to see if there are any unprocessed
* mails in the spool repository.</p>
*
* <p>Synchronized to ensure thread safe access to the underlying spool.</p>
*
- * @return the key for the mail
+ * @return the mail
*/
- public synchronized String accept() throws InterruptedException {
+ public synchronized Mail accept() throws InterruptedException {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Method accept() called");
}
@@ -110,7 +110,18 @@
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("accept() has locked: " + s);
}
- return s;
+ try {
+ MailImpl mail = retrieve(s);
+ // Retrieve can return null if the mail is no longer on the spool
+ // (i.e. another thread has gotten to it first).
+ // In this case we simply continue to the next key
+ if (mail == null) {
+ continue;
+ }
+ return mail;
+ } catch (javax.mail.MessagingException e) {
+ getLogger().error("Exception during retrieve -- skipping item " + s, e);
+ }
}
}
@@ -126,7 +137,7 @@
}
/**
- * <p>Returns the key for an arbitrarily selected mail deposited in this Repository that
+ * <p>Returns an arbitrarily selected mail deposited in this Repository that
* is either ready immediately for delivery, or is younger than it's last_updated plus
* the number of failed attempts times the delay time.
* Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
@@ -134,14 +145,69 @@
*
* <p>Synchronized to ensure thread safe access to the underlying spool.</p>
*
- * @return the key for the mail
+ * @return the mail
*/
- public synchronized String accept(long delay) throws InterruptedException {
+ public synchronized Mail accept(final long delay) throws InterruptedException
+ {
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("Method accept(delay) called");
}
- while (!Thread.currentThread().isInterrupted()) {
+ return accept(new SpoolRepository.AcceptFilter () {
long youngest = 0;
+
+ public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
+ if (state.equals(Mail.ERROR)) {
+ //Test the time...
+ long timeToProcess = delay + lastUpdated;
+
+ if (System.currentTimeMillis() > timeToProcess) {
+ //We're ready to process this again
+ return true;
+ } else {
+ //We're not ready to process this.
+ if (youngest == 0 || youngest > timeToProcess) {
+ //Mark this as the next most likely possible mail to process
+ youngest = timeToProcess;
+ }
+ return false;
+ }
+ } else {
+ //This mail is good to go... return the key
+ return true;
+ }
+ }
+
+ public long getWaitTime () {
+ if (youngest == 0) {
+ return 0;
+ } else {
+ long duration = youngest - System.currentTimeMillis();
+ youngest = 0; //get ready for next round
+ return duration <= 0 ? 1 : duration;
+ }
+ }
+ });
+ }
+
+
+ /**
+ * Returns an arbitrarily select mail deposited in this Repository for
+ * which the supplied filter's accept method returns true.
+ * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
+ * based on number of retries if the mail is ready for processing.
+ * If no message is ready the method will block until one is, the amount of time to block is
+ * determined by calling the filters getWaitTime method.
+ *
+ * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
+ *
+ * @return the mail
+ */
+ public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException
+ {
+ if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
+ getLogger().debug("Method accept(Filter) called");
+ }
+ while (!Thread.currentThread().isInterrupted()) {
for (Iterator it = list(); it.hasNext(); ) {
String s = it.next().toString();
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
@@ -156,48 +222,30 @@
if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
getLogger().debug("accept(delay) has locked: " + s);
}
- //We have a lock on this object... let's grab the message
- // and see if it's a valid time.
-
MailImpl mail = null;
try {
mail = retrieve(s);
} catch (javax.mail.MessagingException e) {
getLogger().error("Exception during retrieve -- skipping item " + s, e);
}
- // Retrieve can return null if the mail is no longer in the store.
- // Or it could have throw an exception, which would would have logged.
+ // Retrieve can return null if the mail is no longer on the spool
+ // (i.e. another thread has gotten to it first).
// In this case we simply continue to the next key
if (mail == null) {
continue;
}
- if (mail.getState().equals(Mail.ERROR)) {
- //Test the time...
- long timeToProcess = delay + mail.getLastUpdated().getTime();
- if (System.currentTimeMillis() > timeToProcess) {
- //We're ready to process this again
- return s;
- } else {
- //We're not ready to process this.
- if (youngest == 0 || youngest > timeToProcess) {
- //Mark this as the next most likely possible mail to process
- youngest = timeToProcess;
- }
- }
- } else {
- //This mail is good to go... return the key
- return s;
+ if (filter.accept (mail.getName(),
+ mail.getState(),
+ mail.getLastUpdated().getTime(),
+ mail.getErrorMessage())) {
+ return mail;
}
+
}
}
//We did not find any... let's wait for a certain amount of time
try {
- if (youngest == 0) {
- wait();
- } else {
- long duration = youngest - System.currentTimeMillis();
- wait(duration <= 0 ? 1 : duration);
- }
+ wait (filter.getWaitTime());
} catch (InterruptedException ex) {
throw ex;
} catch (ConcurrentModificationException cme) {
@@ -207,4 +255,5 @@
}
throw new InterruptedException();
}
+
}
1.15.4.11 +80 -30 james-server/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
Index: JDBCSpoolRepository.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java,v
retrieving revision 1.15.4.10
retrieving revision 1.15.4.11
diff -u -r1.15.4.10 -r1.15.4.11
--- JDBCSpoolRepository.java 28 Aug 2003 16:22:37 -0000 1.15.4.10
+++ JDBCSpoolRepository.java 16 Nov 2003 21:47:24 -0000 1.15.4.11
@@ -159,16 +159,27 @@
}
/**
- * Return the key of a message to process. This is a message in the spool that is not locked.
+ * Return a message to process. This is a message in the spool that is not locked.
*/
- public String accept() throws InterruptedException {
+ public Mail accept() throws InterruptedException {
while (!Thread.currentThread().isInterrupted()) {
//Loop through until we are either out of pending messages or have a message
// that we can lock
PendingMessage next = null;
while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
if (lock(next.key)) {
- return next.key;
+ try {
+ MailImpl mail = retrieve(next.key);
+ // Retrieve can return null if the mail is no longer on the spool
+ // (i.e. another thread has gotten to it first).
+ // In this case we simply continue to the next key
+ if (mail == null) {
+ continue;
+ }
+ return mail;
+ } catch (javax.mail.MessagingException e) {
+ getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
+ }
}
}
//Nothing to do... sleep!
@@ -191,54 +202,90 @@
}
/**
- * Return the key of a message that's ready to process. If a message is of type "error"
+ * Return a message that's ready to process. If a message is of type "error"
* then check the last updated time, and don't try it until the long 'delay' parameter
* milliseconds has passed.
*/
- public synchronized String accept(long delay) throws InterruptedException {
- while (!Thread.currentThread().isInterrupted()) {
- //Loop through until we are either out of pending messages or have a message
- // that we can lock
- PendingMessage next = null;
+ public synchronized Mail accept(final long delay) throws InterruptedException {
+ return accept (new SpoolRepository.AcceptFilter () {
long sleepUntil = 0;
- while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
- //Check whether this is time to expire
- boolean shouldProcess = false;
- if (Mail.ERROR.equals(next.state)) {
+
+ public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
+ if (Mail.ERROR.equals(state)) {
//if it's an error message, test the time
- long processingTime = delay + next.lastUpdated;
+ long processingTime = delay + lastUpdated;
if (processingTime < System.currentTimeMillis()) {
//It's time to process
- shouldProcess = true;
+ return true;
} else {
//We don't process this, but we want to possibly reduce the amount of time
// we sleep so we wake when this message is ready.
if (sleepUntil == 0 || processingTime < sleepUntil) {
sleepUntil = processingTime;
}
+ return false;
}
} else {
- shouldProcess = true;
+ return true;
+ }
+ }
+
+
+ public long getWaitTime () {
+ if (sleepUntil == 0) {
+ sleepUntil = System.currentTimeMillis();
+ }
+ long waitTime = sleepUntil - System.currentTimeMillis();
+ sleepUntil = 0;
+ return waitTime <= 0 ? 1 : waitTime;
}
+
+ });
+ }
+
+ /**
+ * Returns an arbitrarily selected mail deposited in this Repository for
+ * which the supplied filter's accept method returns true.
+ * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
+ * based on number of retries if the mail is ready for processing.
+ * If no message is ready the method will block until one is, the amount of time to block is
+ * determined by calling the filters getWaitTime method.
+ *
+ * @return the mail
+ */
+ public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
+ while (!Thread.currentThread().isInterrupted()) {
+ //Loop through until we are either out of pending messages or have a message
+ // that we can lock
+ PendingMessage next = null;
+ while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
+ //Check whether this is time to expire
+
+ boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
+
if (shouldProcess && lock(next.key)) {
- return next.key;
+ try {
+ MailImpl mail = retrieve(next.key);
+ // Retrieve can return null if the mail is no longer on the spool
+ // (i.e. another thread has gotten to it first).
+ // In this case we simply continue to the next key
+ if (mail == null) {
+ continue;
+ }
+ return mail;
+ } catch (javax.mail.MessagingException e) {
+ getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
+ }
}
}
//Nothing to do... sleep!
- if (sleepUntil == 0) {
- sleepUntil = System.currentTimeMillis() + WAIT_LIMIT;
+ long wait_time = filter.getWaitTime();
+ if (wait_time <= 0) {
+ wait_time = WAIT_LIMIT;
}
try {
synchronized (this) {
- long waitTime = sleepUntil - System.currentTimeMillis();
- //StringBuffer errorBuffer =
- // new StringBuffer(128)
- // .append("waiting ")
- // .append((waitTime) / 1000L)
- // .append(" in ")
- // .append(repositoryName);
- //System.err.println(errorBuffer.toString());
- wait(waitTime <= 0 ? 1 : waitTime);
+ wait (wait_time);
}
} catch (InterruptedException ex) {
throw ex;
@@ -307,7 +354,8 @@
String key = rsListMessages.getString(1);
String state = rsListMessages.getString(2);
long lastUpdated = rsListMessages.getTimestamp(3).getTime();
- pendingMessages.add(new PendingMessage(key, state, lastUpdated));
+ String errorMessage = rsListMessages.getString(4);
+ pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
}
} catch (SQLException sqle) {
//Log it and avoid reloading for a bit
@@ -328,11 +376,13 @@
protected String key;
protected String state;
protected long lastUpdated;
+ protected String errorMessage;
- public PendingMessage(String key, String state, long lastUpdated) {
+ public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
this.key = key;
this.state = state;
this.lastUpdated = lastUpdated;
+ this.errorMessage = errorMessage;
}
}
}
No revision
No revision
1.2.4.4 +48 -6 james-server/src/java/org/apache/james/services/Attic/SpoolRepository.java
Index: SpoolRepository.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/services/Attic/SpoolRepository.java,v
retrieving revision 1.2.4.3
retrieving revision 1.2.4.4
diff -u -r1.2.4.3 -r1.2.4.4
--- SpoolRepository.java 8 Mar 2003 21:54:06 -0000 1.2.4.3
+++ SpoolRepository.java 16 Nov 2003 21:47:24 -0000 1.2.4.4
@@ -58,6 +58,8 @@
package org.apache.james.services;
+import org.apache.mailet.Mail;
+
/**
* Interface for a Repository for Spooling Mails.
* A spool repository is a transitory repository which should empty itself
@@ -69,28 +71,68 @@
extends MailRepository {
/**
+ * Implementations of AcceptFilter can be used to select which mails a SpoolRepository
+ * implementation returns from its accept (AcceptFilter) method
+ **/
+ public static interface AcceptFilter
+ {
+ /**
+ * This method is called by accept(Filter) to determine if the message is
+ * ready for delivery.
+ *
+ * @param key message key
+ * @param state the state of the message
+ * @param lastUpdated the last time the message was written to the spool
+ * @param errorMessage the current errorMessage
+ * @return true if the message is ready for delivery
+ **/
+ boolean accept (String key, String state, long lastUpdated, String errorMessage) ;
+
+
+ /**
+ * This method allows the filter to determine how long the thread should wait for a
+ * message to get ready for delivery, when currently there are none.
+ * @return the time to wait for a message to get ready for delivery
+ **/
+ long getWaitTime ();
+ }
+
+ /**
* Define a STREAM repository. Streams are stored in the specified
* destination.
*/
String SPOOL = "SPOOL";
/**
- * Returns the key for an arbitrarily selected mail deposited in this Repository.
+ * Returns an arbitrarily selected mail deposited in this Repository.
* Usage: SpoolManager calls accept() to see if there are any unprocessed
* mails in the spool repository.
*
- * @return the key for the mail
+ * @return the mail
*/
- String accept() throws InterruptedException;
+ Mail accept() throws InterruptedException;
/**
- * Returns the key for an arbitrarily select mail deposited in this Repository that
+ * Returns an arbitrarily select mail deposited in this Repository that
* is either ready immediately for delivery, or is younger than it's last_updated plus
* the number of failed attempts times the delay time.
* Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
* unprocessed mail is available.
*
- * @return the key for the mail
+ * @return the mail
*/
- String accept(long delay) throws InterruptedException;
+ Mail accept(long delay) throws InterruptedException;
+
+ /**
+ * Returns an arbitrarily select mail deposited in this Repository for
+ * which the supplied filter's accept method returns true.
+ * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
+ * based on number of retries if the mail is ready for processing.
+ * If no message is ready the method will block until one is, the amount of time to block is
+ * determined by calling the filters getWaitTime method.
+ *
+ * @return the mail
+ */
+ Mail accept(AcceptFilter filter) throws InterruptedException;
+
}
No revision
No revision
1.20.4.12 +3 -9 james-server/src/java/org/apache/james/transport/JamesSpoolManager.java
Index: JamesSpoolManager.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/JamesSpoolManager.java,v
retrieving revision 1.20.4.11
retrieving revision 1.20.4.12
diff -u -r1.20.4.11 -r1.20.4.12
--- JamesSpoolManager.java 20 Oct 2003 08:18:29 -0000 1.20.4.11
+++ JamesSpoolManager.java 16 Nov 2003 21:47:24 -0000 1.20.4.12
@@ -350,14 +350,8 @@
while(true) {
String key = null;
try {
- key = spool.accept();
- MailImpl mail = spool.retrieve(key);
- // Retrieve can return null if the mail is no longer on the spool
- // (i.e. another thread has gotten to it first).
- // In this case we simply continue to the next key
- if (mail == null) {
- continue;
- }
+ MailImpl mail = (MailImpl)spool.accept();
+ key = mail.getName();
if (getLogger().isDebugEnabled()) {
StringBuffer debugBuffer =
new StringBuffer(64)
No revision
No revision
1.33.4.14 +287 -12 james-server/src/java/org/apache/james/transport/mailets/RemoteDelivery.java
Index: RemoteDelivery.java
===================================================================
RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/mailets/RemoteDelivery.java,v
retrieving revision 1.33.4.13
retrieving revision 1.33.4.14
diff -u -r1.33.4.13 -r1.33.4.14
--- RemoteDelivery.java 28 Aug 2003 16:22:37 -0000 1.33.4.13
+++ RemoteDelivery.java 16 Nov 2003 21:47:24 -0000 1.33.4.14
@@ -68,6 +68,7 @@
import java.util.Collection;
import java.util.Date;
import java.util.Hashtable;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.Locale;
import java.util.Properties;
@@ -98,12 +99,19 @@
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
+import org.apache.oro.text.regex.MalformedPatternException;
+import org.apache.oro.text.regex.Pattern;
+import org.apache.oro.text.regex.Perl5Compiler;
+import org.apache.oro.text.regex.Perl5Matcher;
+import org.apache.oro.text.regex.MatchResult;
+
+
/**
* Receives a MessageContainer from JamesSpoolManager and takes care of delivery
* the message to remote hosts. If for some reason mail can't be delivered
- * store it in the "outgoing" Repository and set an Alarm. After "delayTime" the
+ * store it in the "outgoing" Repository and set an Alarm. After the next "delayTime" the
* Alarm will wake the servlet that will try to send it again. After "maxRetries"
- * the mail will be considered underiverable and will be returned to sender.
+ * the mail will be considered undeliverable and will be returned to sender.
*
* TO DO (in priority):
* 1. Support a gateway (a single server where all mail will be delivered) (DONE)
@@ -123,13 +131,109 @@
*/
public class RemoteDelivery extends GenericMailet implements Runnable {
+ private static final long DEFAULT_DELAY_TIME = 21600000; // default is 6*60*60*1000 millis (6 hours)
+ private static final String PATTERN_STRING =
+ "\\s*([0-9]*\\s*[\\*])?\\s*([0-9]+)\\s*([a-z,A-Z]*)\\s*";//pattern to match
+ //[attempts*]delay[units]
+
+ private static Pattern PATTERN = null; //the compiled pattern of the above String
+ private static final HashMap MULTIPLIERS = new HashMap (10); //holds allowed units for delaytime together with
+ //the factor to turn it into the equivalent time in msec
+
+ /*
+ * Static initializer.<p>
+ * Compiles pattern for processing delaytime entries.<p>
+ * Initializes MULTIPLIERS with the supported unit quantifiers
+ */
+ static {
+ try {
+ Perl5Compiler compiler = new Perl5Compiler();
+ PATTERN = compiler.compile(PATTERN_STRING, Perl5Compiler.READ_ONLY_MASK);
+ } catch(MalformedPatternException mpe) {
+ //this should not happen as the pattern string is hardcoded.
+ System.err.println ("Malformed pattern: " + PATTERN_STRING);
+ mpe.printStackTrace (System.err);
+ }
+ //add allowed units and their respective multiplier
+ MULTIPLIERS.put ("msec", new Integer (1));
+ MULTIPLIERS.put ("msecs", new Integer (1));
+ MULTIPLIERS.put ("sec", new Integer (1000));
+ MULTIPLIERS.put ("secs", new Integer (1000));
+ MULTIPLIERS.put ("minute", new Integer (1000*60));
+ MULTIPLIERS.put ("minutes", new Integer (1000*60));
+ MULTIPLIERS.put ("hour", new Integer (1000*60*60));
+ MULTIPLIERS.put ("hours", new Integer (1000*60*60));
+ MULTIPLIERS.put ("day", new Integer (1000*60*60*24));
+ MULTIPLIERS.put ("days", new Integer (1000*60*60*24));
+ }
+
+ /**
+ * This filter is used in the accept call to the spool.
+ * It will select the next mail ready for processing according to the mails
+ * retrycount and lastUpdated time
+ **/
+ private class MultipleDelayFilter implements SpoolRepository.AcceptFilter
+ {
+ /**
+ * holds the time to wait for the youngest mail to get ready for processing
+ **/
+ long youngest = 0;
+
+ /**
+ * Uses the getNextDelay to determine if a mail is ready for processing based on the delivered parameters
+ * errorMessage (which holds the retrycount), lastUpdated and state
+ * @param key the name/key of the message
+ * @param state the mails state
+ * @param lastUpdated the mail was last written to the spool at this time.
+ * @param errorMessage actually holds the retrycount as a string (see failMessage below)
+ **/
+ public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
+ if (state.equals(Mail.ERROR)) {
+ //Test the time...
+ int retries = Integer.parseInt(errorMessage);
+ long delay = getNextDelay (retries);
+ long timeToProcess = delay + lastUpdated;
+
+
+ if (System.currentTimeMillis() > timeToProcess) {
+ //We're ready to process this again
+ return true;
+ } else {
+ //We're not ready to process this.
+ if (youngest == 0 || youngest > timeToProcess) {
+ //Mark this as the next most likely possible mail to process
+ youngest = timeToProcess;
+ }
+ return false;
+ }
+ } else {
+ //This mail is good to go... return the key
+ return true;
+ }
+ }
+
+ /**
+ * @return the optimal time the SpoolRepository.accept(AcceptFilter) method should wait before
+ * trying to find a mail ready for processing again.
+ **/
+ public long getWaitTime () {
+ if (youngest == 0) {
+ return 0;
+ } else {
+ long duration = youngest - System.currentTimeMillis();
+ youngest = 0; //get ready for next run
+ return duration <= 0 ? 1 : duration;
+ }
+ }
+ }
+
/**
* Controls certain log messages
*/
private boolean isDebug = false;
private SpoolRepository outgoing; // The spool of outgoing mail
- private long delayTime = 21600000; // default is 6*60*60*1000 millis (6 hours)
+ private long[] delayTimes; //holds expanded delayTimes
private int maxRetries = 5; // default number of retries
private long smtpTimeout = 600000; //default number of ms to timeout on smtp delivery
private boolean sendPartial = false; // If false then ANY address errors will cause the transmission to fail
@@ -145,14 +249,28 @@
private MailServer mailServer;
private volatile boolean destroyed = false; //Flag that the run method will check and end itself if set to true
+ private Perl5Matcher delayTimeMatcher; //matcher use at init time to parse delaytime parameters
+ private MultipleDelayFilter delayFilter = new MultipleDelayFilter ();//used by accept to selcet the next mail ready for processing
+
/**
* Initialize the mailet
*/
public void init() throws MessagingException {
isDebug = (getInitParameter("debug") == null) ? false : new Boolean(getInitParameter("debug")).booleanValue();
+ ArrayList delay_times_list = new ArrayList();
try {
if (getInitParameter("delayTime") != null) {
- delayTime = Long.parseLong(getInitParameter("delayTime"));
+ delayTimeMatcher = new Perl5Matcher();
+ String delay_times = getInitParameter("delayTime");
+ //split on comma's
+ StringTokenizer st = new StringTokenizer (delay_times,",");
+ while (st.hasMoreTokens()) {
+ String delay_time = st.nextToken();
+ delay_times_list.add (new Delay(delay_time));
+ }
+ } else {
+ //use default delayTime.
+ delay_times_list.add (new Delay());
}
} catch (Exception e) {
log("Invalid delayTime setting: " + getInitParameter("delayTime"));
@@ -161,8 +279,30 @@
if (getInitParameter("maxRetries") != null) {
maxRetries = Integer.parseInt(getInitParameter("maxRetries"));
}
+ //check consistency with delay_times_list attempts
+ int total_attempts = calcTotalAttempts (delay_times_list);
+ if (total_attempts > maxRetries) {
+ log("Total number of delayTime attempts exceeds maxRetries specified. Increasing maxRetries from "+maxRetries+" to "+total_attempts);
+ maxRetries = total_attempts;
+ } else {
+ int extra = maxRetries - total_attempts;
+ if (extra != 0) {
+ log("maxRetries is larger than total number of attempts specified. Increasing last delayTime with "+extra+" attempts ");
+
+ if (delay_times_list.size() != 0) {
+ Delay delay = (Delay)delay_times_list.get (delay_times_list.size()-1); //last Delay
+ delay.setAttempts (delay.getAttempts()+extra);
+ log("Delay of "+delay.getDelayTime()+" msecs is now attempted: "+delay.getAttempts()+" times");
+ } else {
+ log ("NO, delaytimes cannot continue");
+ }
+ }
+ }
+ delayTimes = expandDelays (delay_times_list);
+
} catch (Exception e) {
log("Invalid maxRetries setting: " + getInitParameter("maxRetries"));
+ e.printStackTrace();
}
try {
if (getInitParameter("timeout") != null) {
@@ -778,7 +918,8 @@
try {
while (!Thread.currentThread().interrupted() && !destroyed) {
try {
- String key = outgoing.accept(delayTime);
+ MailImpl mail = (MailImpl)outgoing.accept(delayFilter);
+ String key = mail.getName();
try {
if (isDebug) {
StringBuffer logMessageBuffer =
@@ -788,12 +929,6 @@
.append(key);
log(logMessageBuffer.toString());
}
- MailImpl mail = outgoing.retrieve(key);
- // Retrieve can return null if the mail is no longer on the outgoing spool.
- // In this case we simply continue to the next key
- if (mail == null) {
- continue;
- }
if (deliver(mail, session)) {
//Message was successfully delivered/fully failed... delete it
outgoing.remove(key);
@@ -818,4 +953,144 @@
Thread.currentThread().interrupted();
}
}
+
+ /**
+ * @param list holding Delay objects
+ * @return the total attempts for all delays
+ **/
+ private int calcTotalAttempts (ArrayList list) {
+ int sum = 0;
+ Iterator i = list.iterator();
+ while (i.hasNext()) {
+ Delay delay = (Delay)i.next();
+ sum += delay.getAttempts();
+ }
+ return sum;
+ }
+
+ /**
+ * This method expands an ArrayList containing Delay objects into an array holding the
+ * only delaytime in the order.<p>
+ * So if the list has 2 Delay objects the first having attempts=2 and delaytime 4000
+ * the second having attempts=1 and delaytime=300000 will be expanded into this array:<p>
+ * long[0] = 4000<p>
+ * long[1] = 4000<p>
+ * long[2] = 300000<p>
+ * @param list the list to expand
+ * @return the expanded list
+ **/
+ private long[] expandDelays (ArrayList list) {
+ long[] delays = new long [calcTotalAttempts(list)];
+ Iterator i = list.iterator();
+ int idx = 0;
+ while (i.hasNext()) {
+ Delay delay = (Delay)i.next();
+ for (int j=0; j<delay.getAttempts(); j++) {
+ delays[idx++]= delay.getDelayTime();
+ }
+ }
+ return delays;
+ }
+
+ /**
+ * This method returns, given a retry-count, the next delay time to use.
+ * @param retry_count the current retry_count.
+ * @return the next delay time to use, given the retry count
+ **/
+ private long getNextDelay (int retry_count) {
+ return delayTimes[retry_count-1];
+ }
+
+ /**
+ * This class is used to hold a delay time and its corresponding number
+ * of retries.
+ **/
+ private class Delay {
+ private int attempts = 1;
+ private long delayTime = DEFAULT_DELAY_TIME;
+
+
+ /**
+ * This constructor expects Strings of the form "[attempt\*]delaytime[unit]". <p>
+ * The optional attempt is the number of tries this delay should be used (default = 1)
+ * The unit if present must be one of (msec,sec,minute,hour,day) (default = msec)
+ * The constructor multiplies the delaytime by the relevant multiplier for the unit,
+ * so the delayTime instance variable is always in msec.
+ * @param init_string the string to initialize this Delay object from
+ **/
+ public Delay (String init_string) throws MessagingException
+ {
+ String unit = "msec"; //default unit
+ if (delayTimeMatcher.matches (init_string, PATTERN)) {
+ MatchResult res = delayTimeMatcher.getMatch ();
+ //the capturing groups will now hold
+ //at 1: attempts * (if present)
+ //at 2: delaytime
+ //at 3: unit (if present)
+
+ if (res.group(1) != null && !res.group(1).equals ("")) {
+ //we have an attempt *
+ String attempt_match = res.group(1);
+ //strip the * and whitespace
+ attempt_match = attempt_match.substring (0,attempt_match.length()-1).trim();
+ attempts = Integer.parseInt (attempt_match);
+ }
+
+ delayTime = Long.parseLong (res.group(2));
+
+ if (!res.group(3).equals ("")) {
+ //we have a unit
+ unit = res.group(3).toLowerCase();
+ }
+ } else {
+ throw new MessagingException(init_string+" does not match "+PATTERN_STRING);
+ }
+ if (MULTIPLIERS.get (unit)!=null) {
+ int multiplier = ((Integer)MULTIPLIERS.get (unit)).intValue();
+ delayTime *= multiplier;
+ } else {
+ throw new MessagingException("Unknown unit: "+unit);
+ }
+ }
+
+ /**
+ * This constructor makes a default Delay object, ie. attempts=1 and delayTime=DEFAULT_DELAY_TIME
+ **/
+ public Delay () {
+ }
+
+ /**
+ * @return the delayTime for this Delay
+ **/
+ public long getDelayTime () {
+ return delayTime;
+ }
+
+ /**
+ * @return the number attempts this Delay should be used.
+ **/
+ public int getAttempts () {
+ return attempts;
+ }
+
+ /**
+ * Set the number attempts this Delay should be used.
+ **/
+ public void setAttempts (int value) {
+ attempts = value;
+ }
+
+ /**
+ * Pretty prints this Delay
+ **/
+ public String toString () {
+ StringBuffer buf = new StringBuffer(15);
+ buf.append (getAttempts ());
+ buf.append ('*');
+ buf.append (getDelayTime());
+ buf.append ("msec");
+ return buf.toString();
+ }
+ }
+
}
No revision
No revision
1.5.4.6 +16 -3 james-server/src/xdocs/provided_mailets_2_1.xml
Index: provided_mailets_2_1.xml
===================================================================
RCS file: /home/cvs/james-server/src/xdocs/provided_mailets_2_1.xml,v
retrieving revision 1.5.4.5
retrieving revision 1.5.4.6
diff -u -r1.5.4.5 -r1.5.4.6
--- provided_mailets_2_1.xml 29 May 2003 20:39:44 -0000 1.5.4.5
+++ provided_mailets_2_1.xml 16 Nov 2003 21:47:24 -0000 1.5.4.6
@@ -162,10 +162,23 @@
<ul>
<li><strong>outgoing</strong> (required) - The URL for the repository that will hold messages being processed
by the RemoteDelivery Mailet.</li>
-<li><strong>delayTime</strong> (optional) - a non-negative Long value that is the time in
-milliseconds between redelivery attempts for a particular mail. Defaults to six hours.</li>
+<li><strong>delayTime</strong> (optional) - takes the form
+<attempts*delay quantifier> where attempts and quantifier are both
+optional an defaults to 1 and miliseconds respectively.
+The delay is a non-negative value that is the time between redelivery
+attempts for a particular mail. Defaults to six hours. Allowed
+quantifiers are msec(s),sec(s),minute(s),hour(s),day(s). The attempts
+is the number of times the specified delay is used. It is possible to
+have multiple entries of this parameter, and if so the the delayTimes
+are used in the order specified, this allows you to try a few times
+in rapid succession and then move on to longer delays</li>
<li><strong>maxRetries</strong> (optional) - a non-negative Integer value that is number of times
-the Mailet will attempt to deliver a particular mail. Defaults to five.</li>
+the Mailet will attempt to deliver a particular mail. If inconsistent
+with number of attempts in <strong>delayTime</strong>
+specifications. <strong>maxRetries<strong> will be increased if it is
+less than the total number of attempts in <strong>delayTime</strong>,
+if it is more the last specified <strong>delayTime</strong> is used
+for the missing delays. Defaults to five. </li>
<li><strong>timeout</strong> (optional) - The SMTP connection timeout for SMTP connections generated
by this Mailet. Defaults to 60 seconds.</li>
<li><strong>deliveryThreads</strong> (optional) - The number of threads this Mailet will use to generate
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org