You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2003/11/16 22:47:24 UTC

cvs commit: james-server/src/xdocs provided_mailets_2_1.xml

noel        2003/11/16 13:47:24

  Modified:    src/conf Tag: branch_2_1_fcs james-config.xml
                        sqlResources.xml
               src/java/org/apache/james/mailrepository Tag: branch_2_1_fcs
                        AvalonSpoolRepository.java JDBCSpoolRepository.java
               src/java/org/apache/james/services Tag: branch_2_1_fcs
                        SpoolRepository.java
               src/java/org/apache/james/transport Tag: branch_2_1_fcs
                        JamesSpoolManager.java
               src/java/org/apache/james/transport/mailets Tag:
                        branch_2_1_fcs RemoteDelivery.java
               src/xdocs Tag: branch_2_1_fcs provided_mailets_2_1.xml
  Log:
  Commit Soren Hilmer's change to support a flexible retry schedule for RemoteDelivery.
  
  This commit also includes a concurrent change to SpoolRepository so that accept() is a one-step process that returns the Mail, rather than an unsynchronized two-step process.  Another related change is that accept() takes a filter object to control the next message to be returned, rather than hardcoding the algorithm.
  
  Revision  Changes    Path
  No                   revision
  No                   revision
  1.40.2.20 +12 -6     james-server/src/conf/james-config.xml
  
  Index: james-config.xml
  ===================================================================
  RCS file: /home/cvs/james-server/src/conf/james-config.xml,v
  retrieving revision 1.40.2.19
  retrieving revision 1.40.2.20
  diff -u -r1.40.2.19 -r1.40.2.20
  --- james-config.xml	20 Oct 2003 03:14:15 -0000	1.40.2.19
  +++ james-config.xml	16 Nov 2003 21:47:24 -0000	1.40.2.20
  @@ -299,11 +299,17 @@
               <outgoing> db://maildb/spool/outgoing </outgoing>
               -->
   
  -            <!-- Number of milliseconds between delivery attempts -->
  -            <delayTime> 21600000 </delayTime>
  -
  -            <!-- Number of failed attempts before returning to the sender -->
  -            <maxRetries> 5 </maxRetries>
  +            <!-- Delivery Schedule based upon RFC 2821, 4.5.4.1 -->
  +            <!-- 5 day retry period, with 4 attempts in the first
  +                 hour, two more within the first 6 hours, and then
  +                 every 6 hours for the rest of the period. -->
  +            <delayTime>  5 minutes </delayTime>
  +            <delayTime> 10 minutes </delayTime>
  +            <delayTime> 45 minutes </delayTime>
  +            <delayTime>  2 hours </delayTime>
  +            <delayTime>  3 hours </delayTime>
  +            <delayTime>  6 hours </delayTime>
  +            <maxRetries> 25 </maxRetries>
   
               <!-- The number of threads that should be trying to deliver outgoing messages -->
               <deliveryThreads> 1 </deliveryThreads>
  
  
  
  1.16.4.8  +1 -1      james-server/src/conf/sqlResources.xml
  
  Index: sqlResources.xml
  ===================================================================
  RCS file: /home/cvs/james-server/src/conf/sqlResources.xml,v
  retrieving revision 1.16.4.7
  retrieving revision 1.16.4.8
  diff -u -r1.16.4.7 -r1.16.4.8
  --- sqlResources.xml	18 Aug 2003 15:42:36 -0000	1.16.4.7
  +++ sqlResources.xml	16 Nov 2003 21:47:24 -0000	1.16.4.8
  @@ -346,7 +346,7 @@
       <sql name="removeMessageSQL">DELETE FROM ${table} WHERE message_name = ? AND repository_name = ?</sql>
   
       <!-- Statements used to list all messages stored in this repository. -->
  -    <sql name="listMessagesSQL">SELECT message_name, message_state, last_updated FROM ${table} WHERE repository_name = ? ORDER BY last_updated ASC</sql>
  +    <sql name="listMessagesSQL">SELECT message_name, message_state, last_updated, error_message FROM ${table} WHERE repository_name = ? ORDER BY last_updated ASC</sql>
   
       <!-- Statements used to create the table associated with this class. -->
       <sql name="createTable" db="hypersonic">
  
  
  
  No                   revision
  No                   revision
  1.7.4.6   +84 -35    james-server/src/java/org/apache/james/mailrepository/AvalonSpoolRepository.java
  
  Index: AvalonSpoolRepository.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/mailrepository/AvalonSpoolRepository.java,v
  retrieving revision 1.7.4.5
  retrieving revision 1.7.4.6
  diff -u -r1.7.4.5 -r1.7.4.6
  --- AvalonSpoolRepository.java	28 Aug 2003 16:22:37 -0000	1.7.4.5
  +++ AvalonSpoolRepository.java	16 Nov 2003 21:47:24 -0000	1.7.4.6
  @@ -81,15 +81,15 @@
       implements SpoolRepository {
   
       /**
  -     * <p>Returns the key for an arbitrarily selected mail deposited in this Repository.
  +     * <p>Returns an arbitrarily selected mail deposited in this Repository.
        * Usage: SpoolManager calls accept() to see if there are any unprocessed 
        * mails in the spool repository.</p>
        *
        * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
        *
  -     * @return the key for the mail
  +     * @return the mail
        */
  -    public synchronized String accept() throws InterruptedException {
  +    public synchronized Mail accept() throws InterruptedException {
           if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
               getLogger().debug("Method accept() called");
           }
  @@ -110,7 +110,18 @@
                           if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
                               getLogger().debug("accept() has locked: " + s);
                           }
  -                        return s;
  +                        try {
  +                            MailImpl mail = retrieve(s);
  +                            // Retrieve can return null if the mail is no longer on the spool
  +                            // (i.e. another thread has gotten to it first).
  +                            // In this case we simply continue to the next key
  +                            if (mail == null) {
  +                                continue;
  +                            }
  +                            return mail;
  +                        } catch (javax.mail.MessagingException e) {
  +                            getLogger().error("Exception during retrieve -- skipping item " + s, e);
  +                        }
                       }
                   }
   
  @@ -126,7 +137,7 @@
       }
   
       /**
  -     * <p>Returns the key for an arbitrarily selected mail deposited in this Repository that
  +     * <p>Returns an arbitrarily selected mail deposited in this Repository that
        * is either ready immediately for delivery, or is younger than it's last_updated plus
        * the number of failed attempts times the delay time.
        * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
  @@ -134,14 +145,69 @@
        *
        * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
        *
  -     * @return the key for the mail
  +     * @return the mail
        */
  -    public synchronized String accept(long delay) throws InterruptedException {
  +    public synchronized Mail accept(final long delay) throws InterruptedException
  +    {
           if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
               getLogger().debug("Method accept(delay) called");
           }
  -        while (!Thread.currentThread().isInterrupted()) {
  +        return accept(new SpoolRepository.AcceptFilter () {
               long youngest = 0;
  +                
  +                public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
  +                    if (state.equals(Mail.ERROR)) {
  +                        //Test the time...
  +                        long timeToProcess = delay + lastUpdated;
  +                
  +                        if (System.currentTimeMillis() > timeToProcess) {
  +                            //We're ready to process this again
  +                            return true;
  +                        } else {
  +                            //We're not ready to process this.
  +                            if (youngest == 0 || youngest > timeToProcess) {
  +                                //Mark this as the next most likely possible mail to process
  +                                youngest = timeToProcess;
  +                            }
  +                            return false;
  +                        }
  +                    } else {
  +                        //This mail is good to go... return the key
  +                        return true;
  +                    }
  +                }
  +        
  +                public long getWaitTime () {
  +                    if (youngest == 0) {
  +                        return 0;
  +                    } else {
  +                        long duration = youngest - System.currentTimeMillis();
  +                        youngest = 0; //get ready for next round
  +                        return duration <= 0 ? 1 : duration;
  +                    }
  +                }
  +            });
  +    }
  +
  +
  +    /**
  +     * Returns an arbitrarily select mail deposited in this Repository for
  +     * which the supplied filter's accept method returns true.
  +     * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
  +     * based on number of retries if the mail is ready for processing.
  +     * If no message is ready the method will block until one is, the amount of time to block is
  +     * determined by calling the filters getWaitTime method.
  +     *
  +     * <p>Synchronized to ensure thread safe access to the underlying spool.</p>
  +     *
  +     * @return  the mail
  +     */
  +    public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException
  +    {
  +        if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
  +            getLogger().debug("Method accept(Filter) called");
  +        }
  +        while (!Thread.currentThread().isInterrupted()) {
               for (Iterator it = list(); it.hasNext(); ) {
                   String s = it.next().toString();
                   if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
  @@ -156,48 +222,30 @@
                       if ((DEEP_DEBUG) && (getLogger().isDebugEnabled())) {
                           getLogger().debug("accept(delay) has locked: " + s);
                       }
  -                    //We have a lock on this object... let's grab the message
  -                    //  and see if it's a valid time.
  -
                       MailImpl mail = null;
                       try {
                           mail = retrieve(s);
                       } catch (javax.mail.MessagingException e) {
                           getLogger().error("Exception during retrieve -- skipping item " + s, e);
                       }
  -                    // Retrieve can return null if the mail is no longer in the store.
  -                    // Or it could have throw an exception, which would would have logged.
  +                    // Retrieve can return null if the mail is no longer on the spool
  +                    // (i.e. another thread has gotten to it first).
                       // In this case we simply continue to the next key
                       if (mail == null) {
                           continue;
                       }
  -                    if (mail.getState().equals(Mail.ERROR)) {
  -                        //Test the time...
  -                        long timeToProcess = delay + mail.getLastUpdated().getTime();
  -                        if (System.currentTimeMillis() > timeToProcess) {
  -                            //We're ready to process this again
  -                            return s;
  -                        } else {
  -                            //We're not ready to process this.
  -                            if (youngest == 0 || youngest > timeToProcess) {
  -                                //Mark this as the next most likely possible mail to process
  -                                youngest = timeToProcess;
  -                            }
  -                        }
  -                    } else {
  -                        //This mail is good to go... return the key
  -                        return s;
  +                    if (filter.accept (mail.getName(),
  +                                       mail.getState(),
  +                                       mail.getLastUpdated().getTime(),
  +                                       mail.getErrorMessage())) {
  +                        return mail;
                       }
  +                    
                   }
               }
               //We did not find any... let's wait for a certain amount of time
               try {
  -                if (youngest == 0) {
  -                    wait();
  -                } else {
  -                    long duration = youngest - System.currentTimeMillis();
  -                    wait(duration <= 0 ? 1 : duration);
  -                }
  +                wait (filter.getWaitTime());
               } catch (InterruptedException ex) {
                   throw ex;
               } catch (ConcurrentModificationException cme) {
  @@ -207,4 +255,5 @@
           }
           throw new InterruptedException();
       }
  +    
   }
  
  
  
  1.15.4.11 +80 -30    james-server/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java
  
  Index: JDBCSpoolRepository.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/mailrepository/JDBCSpoolRepository.java,v
  retrieving revision 1.15.4.10
  retrieving revision 1.15.4.11
  diff -u -r1.15.4.10 -r1.15.4.11
  --- JDBCSpoolRepository.java	28 Aug 2003 16:22:37 -0000	1.15.4.10
  +++ JDBCSpoolRepository.java	16 Nov 2003 21:47:24 -0000	1.15.4.11
  @@ -159,16 +159,27 @@
       }
   
       /**
  -     * Return the key of a message to process.  This is a message in the spool that is not locked.
  +     * Return a message to process.  This is a message in the spool that is not locked.
        */
  -    public String accept() throws InterruptedException {
  +    public Mail accept() throws InterruptedException {
           while (!Thread.currentThread().isInterrupted()) {
               //Loop through until we are either out of pending messages or have a message
               // that we can lock
               PendingMessage next = null;
               while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
                   if (lock(next.key)) {
  -                    return next.key;
  +                    try {
  +                        MailImpl mail = retrieve(next.key);
  +                        // Retrieve can return null if the mail is no longer on the spool
  +                        // (i.e. another thread has gotten to it first).
  +                        // In this case we simply continue to the next key
  +                        if (mail == null) {
  +                            continue;
  +                        }
  +                        return mail;
  +                    } catch (javax.mail.MessagingException e) {
  +                        getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
  +                    }
                   }
               }
               //Nothing to do... sleep!
  @@ -191,54 +202,90 @@
       }
   
       /**
  -     * Return the key of a message that's ready to process.  If a message is of type "error"
  +     * Return a message that's ready to process.  If a message is of type "error"
        * then check the last updated time, and don't try it until the long 'delay' parameter
        * milliseconds has passed.
        */
  -    public synchronized String accept(long delay) throws InterruptedException {
  -        while (!Thread.currentThread().isInterrupted()) {
  -            //Loop through until we are either out of pending messages or have a message
  -            // that we can lock
  -            PendingMessage next = null;
  +    public synchronized Mail accept(final long delay) throws InterruptedException {
  +        return accept (new SpoolRepository.AcceptFilter () {
               long sleepUntil = 0;
  -            while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
  -                //Check whether this is time to expire
  -                boolean shouldProcess = false;
  -                if (Mail.ERROR.equals(next.state)) {
  +                
  +                public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
  +                    if (Mail.ERROR.equals(state)) {
                       //if it's an error message, test the time
  -                    long processingTime = delay + next.lastUpdated;
  +                        long processingTime = delay + lastUpdated;
                       if (processingTime < System.currentTimeMillis()) {
                           //It's time to process
  -                        shouldProcess = true;
  +                            return true;
                       } else {
                           //We don't process this, but we want to possibly reduce the amount of time
                           //  we sleep so we wake when this message is ready.
                           if (sleepUntil == 0 || processingTime < sleepUntil) {
                               sleepUntil = processingTime;
                           }
  +                            return false;
                       }
                   } else {
  -                    shouldProcess = true;
  +                        return true;
  +                    }
  +                }
  +                
  +
  +                public long getWaitTime () {
  +                    if (sleepUntil == 0) {
  +                        sleepUntil = System.currentTimeMillis();
  +                    }
  +                    long waitTime = sleepUntil - System.currentTimeMillis();
  +                    sleepUntil = 0;
  +                    return waitTime <= 0 ? 1 : waitTime;
                   }
  +                
  +            });
  +    }
  +
  +    /**
  +     * Returns an arbitrarily selected mail deposited in this Repository for
  +     * which the supplied filter's accept method returns true.
  +     * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
  +     * based on number of retries if the mail is ready for processing.
  +     * If no message is ready the method will block until one is, the amount of time to block is
  +     * determined by calling the filters getWaitTime method.
  +     *
  +     * @return  the mail
  +     */
  +    public synchronized Mail accept(SpoolRepository.AcceptFilter filter) throws InterruptedException {
  +        while (!Thread.currentThread().isInterrupted()) {
  +            //Loop through until we are either out of pending messages or have a message
  +            // that we can lock
  +            PendingMessage next = null;
  +            while ((next = getNextPendingMessage()) != null && !Thread.currentThread().isInterrupted()) {
  +                //Check whether this is time to expire
  +                
  +                boolean shouldProcess = filter.accept (next.key, next.state, next.lastUpdated, next.errorMessage);
  +                
                   if (shouldProcess && lock(next.key)) {
  -                    return next.key;
  +                    try {
  +                        MailImpl mail = retrieve(next.key);
  +                        // Retrieve can return null if the mail is no longer on the spool
  +                        // (i.e. another thread has gotten to it first).
  +                        // In this case we simply continue to the next key
  +                        if (mail == null) {
  +                            continue;
  +                        }
  +                        return mail;
  +                    } catch (javax.mail.MessagingException e) {
  +                        getLogger().error("Exception during retrieve -- skipping item " + next.key, e);
  +                    }
                   }
               }
               //Nothing to do... sleep!
  -            if (sleepUntil == 0) {
  -                sleepUntil = System.currentTimeMillis() + WAIT_LIMIT;
  +            long wait_time = filter.getWaitTime();
  +            if (wait_time <= 0) {
  +                wait_time = WAIT_LIMIT;
               }
               try {
                   synchronized (this) {
  -                    long waitTime = sleepUntil - System.currentTimeMillis();
  -                    //StringBuffer errorBuffer =
  -                    //    new StringBuffer(128)
  -                    //            .append("waiting ")
  -                    //            .append((waitTime) / 1000L)
  -                    //            .append(" in ")
  -                    //            .append(repositoryName);
  -                    //System.err.println(errorBuffer.toString());
  -                    wait(waitTime <= 0 ? 1 : waitTime);
  +                    wait (wait_time);
                   }
               } catch (InterruptedException ex) {
                   throw ex;
  @@ -307,7 +354,8 @@
                       String key = rsListMessages.getString(1);
                       String state = rsListMessages.getString(2);
                       long lastUpdated = rsListMessages.getTimestamp(3).getTime();
  -                    pendingMessages.add(new PendingMessage(key, state, lastUpdated));
  +                    String errorMessage = rsListMessages.getString(4);
  +                    pendingMessages.add(new PendingMessage(key, state, lastUpdated, errorMessage));
                   }
               } catch (SQLException sqle) {
                   //Log it and avoid reloading for a bit
  @@ -328,11 +376,13 @@
           protected String key;
           protected String state;
           protected long lastUpdated;
  +        protected String errorMessage;
   
  -        public PendingMessage(String key, String state, long lastUpdated) {
  +        public PendingMessage(String key, String state, long lastUpdated, String errorMessage) {
               this.key = key;
               this.state = state;
               this.lastUpdated = lastUpdated;
  +            this.errorMessage = errorMessage;
           }
       }
   }
  
  
  
  No                   revision
  No                   revision
  1.2.4.4   +48 -6     james-server/src/java/org/apache/james/services/Attic/SpoolRepository.java
  
  Index: SpoolRepository.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/services/Attic/SpoolRepository.java,v
  retrieving revision 1.2.4.3
  retrieving revision 1.2.4.4
  diff -u -r1.2.4.3 -r1.2.4.4
  --- SpoolRepository.java	8 Mar 2003 21:54:06 -0000	1.2.4.3
  +++ SpoolRepository.java	16 Nov 2003 21:47:24 -0000	1.2.4.4
  @@ -58,6 +58,8 @@
   
   package org.apache.james.services;
   
  +import org.apache.mailet.Mail;
  +
   /**
    * Interface for a Repository for Spooling Mails.
    * A spool repository is a transitory repository which should empty itself 
  @@ -69,28 +71,68 @@
       extends MailRepository {
   
       /**
  +     * Implementations of AcceptFilter can be used to select which mails a SpoolRepository
  +     * implementation returns from its accept (AcceptFilter) method
  +     **/
  +    public static interface AcceptFilter
  +    {
  +        /**
  +         * This method is called by accept(Filter) to determine if the message is
  +         * ready for delivery.
  +         *
  +         * @param key message key
  +         * @param state the state of the message
  +         * @param lastUpdated the last time the message was written to the spool
  +         * @param errorMessage the current errorMessage
  +         * @return true if the message is ready for delivery
  +         **/
  +        boolean accept (String key, String state, long lastUpdated, String errorMessage) ;
  +
  +
  +        /**
  +         * This method allows the filter to determine how long the thread should wait for a
  +         * message to get ready for delivery, when currently there are none.
  +         * @return the time to wait for a message to get ready for delivery
  +         **/
  +        long getWaitTime ();
  +    }
  +    
  +    /**
        * Define a STREAM repository. Streams are stored in the specified
        * destination.
        */
       String SPOOL = "SPOOL";
   
       /**
  -     * Returns the key for an arbitrarily selected mail deposited in this Repository.
  +     * Returns an arbitrarily selected mail deposited in this Repository.
        * Usage: SpoolManager calls accept() to see if there are any unprocessed 
        * mails in the spool repository.
        *
  -     * @return the key for the mail
  +     * @return the mail
        */
  -    String accept() throws InterruptedException;
  +    Mail accept() throws InterruptedException;
   
       /**
  -     * Returns the key for an arbitrarily select mail deposited in this Repository that
  +     * Returns an arbitrarily select mail deposited in this Repository that
        * is either ready immediately for delivery, or is younger than it's last_updated plus
        * the number of failed attempts times the delay time.
        * Usage: RemoteDeliverySpool calls accept() with some delay and should block until an
        * unprocessed mail is available.
        *
  -     * @return the key for the mail
  +     * @return the mail
        */
  -    String accept(long delay) throws InterruptedException;
  +    Mail accept(long delay) throws InterruptedException;
  +
  +    /**
  +     * Returns an arbitrarily select mail deposited in this Repository for
  +     * which the supplied filter's accept method returns true.
  +     * Usage: RemoteDeliverySpool calls accept(filter) with some a filter which determines
  +     * based on number of retries if the mail is ready for processing.
  +     * If no message is ready the method will block until one is, the amount of time to block is
  +     * determined by calling the filters getWaitTime method.
  +     *
  +     * @return the mail
  +     */
  +    Mail accept(AcceptFilter filter) throws InterruptedException;
  +
   }
  
  
  
  No                   revision
  No                   revision
  1.20.4.12 +3 -9      james-server/src/java/org/apache/james/transport/JamesSpoolManager.java
  
  Index: JamesSpoolManager.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/JamesSpoolManager.java,v
  retrieving revision 1.20.4.11
  retrieving revision 1.20.4.12
  diff -u -r1.20.4.11 -r1.20.4.12
  --- JamesSpoolManager.java	20 Oct 2003 08:18:29 -0000	1.20.4.11
  +++ JamesSpoolManager.java	16 Nov 2003 21:47:24 -0000	1.20.4.12
  @@ -350,14 +350,8 @@
           while(true) {
               String key = null;
               try {
  -                key = spool.accept();
  -                MailImpl mail = spool.retrieve(key);
  -                // Retrieve can return null if the mail is no longer on the spool
  -                // (i.e. another thread has gotten to it first).
  -                // In this case we simply continue to the next key
  -                if (mail == null) {
  -                    continue;
  -                }
  +                MailImpl mail = (MailImpl)spool.accept();
  +                key = mail.getName();
                   if (getLogger().isDebugEnabled()) {
                       StringBuffer debugBuffer =
                           new StringBuffer(64)
  
  
  
  No                   revision
  No                   revision
  1.33.4.14 +287 -12   james-server/src/java/org/apache/james/transport/mailets/RemoteDelivery.java
  
  Index: RemoteDelivery.java
  ===================================================================
  RCS file: /home/cvs/james-server/src/java/org/apache/james/transport/mailets/RemoteDelivery.java,v
  retrieving revision 1.33.4.13
  retrieving revision 1.33.4.14
  diff -u -r1.33.4.13 -r1.33.4.14
  --- RemoteDelivery.java	28 Aug 2003 16:22:37 -0000	1.33.4.13
  +++ RemoteDelivery.java	16 Nov 2003 21:47:24 -0000	1.33.4.14
  @@ -68,6 +68,7 @@
   import java.util.Collection;
   import java.util.Date;
   import java.util.Hashtable;
  +import java.util.HashMap;
   import java.util.Iterator;
   import java.util.Locale;
   import java.util.Properties;
  @@ -98,12 +99,19 @@
   import org.apache.mailet.Mail;
   import org.apache.mailet.MailAddress;
   
  +import org.apache.oro.text.regex.MalformedPatternException;
  +import org.apache.oro.text.regex.Pattern;
  +import org.apache.oro.text.regex.Perl5Compiler;
  +import org.apache.oro.text.regex.Perl5Matcher;
  +import org.apache.oro.text.regex.MatchResult;
  +
  +
   /**
    * Receives a MessageContainer from JamesSpoolManager and takes care of delivery
    * the message to remote hosts. If for some reason mail can't be delivered
  - * store it in the "outgoing" Repository and set an Alarm. After "delayTime" the
  + * store it in the "outgoing" Repository and set an Alarm. After the next "delayTime" the
    * Alarm will wake the servlet that will try to send it again. After "maxRetries"
  - * the mail will be considered underiverable and will be returned to sender.
  + * the mail will be considered undeliverable and will be returned to sender.
    *
    * TO DO (in priority):
    * 1. Support a gateway (a single server where all mail will be delivered) (DONE)
  @@ -123,13 +131,109 @@
    */
   public class RemoteDelivery extends GenericMailet implements Runnable {
   
  +    private static final long DEFAULT_DELAY_TIME = 21600000; // default is 6*60*60*1000 millis (6 hours)
  +    private static final String PATTERN_STRING =
  +        "\\s*([0-9]*\\s*[\\*])?\\s*([0-9]+)\\s*([a-z,A-Z]*)\\s*";//pattern to match
  +                                                                 //[attempts*]delay[units]
  +                                            
  +    private static Pattern PATTERN = null; //the compiled pattern of the above String
  +    private static final HashMap MULTIPLIERS = new HashMap (10); //holds allowed units for delaytime together with
  +                                                                //the factor to turn it into the equivalent time in msec
  +
  +    /*
  +     * Static initializer.<p>
  +     * Compiles pattern for processing delaytime entries.<p>
  +     * Initializes MULTIPLIERS with the supported unit quantifiers
  +     */
  +    static {
  +        try {
  +            Perl5Compiler compiler = new Perl5Compiler(); 
  +            PATTERN = compiler.compile(PATTERN_STRING, Perl5Compiler.READ_ONLY_MASK);
  +        } catch(MalformedPatternException mpe) {
  +            //this should not happen as the pattern string is hardcoded.
  +            System.err.println ("Malformed pattern: " + PATTERN_STRING);
  +            mpe.printStackTrace (System.err);
  +        }
  +        //add allowed units and their respective multiplier
  +        MULTIPLIERS.put ("msec", new Integer (1));
  +        MULTIPLIERS.put ("msecs", new Integer (1));
  +        MULTIPLIERS.put ("sec",  new Integer (1000));
  +        MULTIPLIERS.put ("secs",  new Integer (1000));
  +        MULTIPLIERS.put ("minute", new Integer (1000*60));
  +        MULTIPLIERS.put ("minutes", new Integer (1000*60));
  +        MULTIPLIERS.put ("hour", new Integer (1000*60*60));
  +        MULTIPLIERS.put ("hours", new Integer (1000*60*60));
  +        MULTIPLIERS.put ("day", new Integer (1000*60*60*24));
  +        MULTIPLIERS.put ("days", new Integer (1000*60*60*24));
  +    }
  +    
  +    /**
  +     * This filter is used in the accept call to the spool.
  +     * It will select the next mail ready for processing according to the mails
  +     * retrycount and lastUpdated time
  +     **/
  +    private class MultipleDelayFilter implements SpoolRepository.AcceptFilter
  +    {
  +        /**
  +         * holds the time to wait for the youngest mail to get ready for processing
  +         **/
  +        long youngest = 0;
  +
  +        /**
  +         * Uses the getNextDelay to determine if a mail is ready for processing based on the delivered parameters
  +         * errorMessage (which holds the retrycount), lastUpdated and state
  +         * @param key the name/key of the message
  +         * @param state the mails state
  +         * @param lastUpdated the mail was last written to the spool at this time.
  +         * @param errorMessage actually holds the retrycount as a string (see failMessage below)
  +         **/
  +        public boolean accept (String key, String state, long lastUpdated, String errorMessage) {
  +            if (state.equals(Mail.ERROR)) {
  +                //Test the time...
  +                int retries = Integer.parseInt(errorMessage);
  +                long delay = getNextDelay (retries);
  +                long timeToProcess = delay + lastUpdated;
  +
  +                
  +                if (System.currentTimeMillis() > timeToProcess) {
  +                    //We're ready to process this again
  +                    return true;
  +                } else {
  +                    //We're not ready to process this.
  +                    if (youngest == 0 || youngest > timeToProcess) {
  +                        //Mark this as the next most likely possible mail to process
  +                        youngest = timeToProcess;
  +                    }
  +                    return false;
  +                }
  +            } else {
  +                //This mail is good to go... return the key
  +                return true;
  +            }
  +        }
  +
  +        /**
  +         * @return the optimal time the SpoolRepository.accept(AcceptFilter) method should wait before
  +         * trying to find a mail ready for processing again.
  +         **/
  +        public long getWaitTime () {
  +            if (youngest == 0) {
  +                return 0;
  +            } else {
  +                long duration = youngest - System.currentTimeMillis();
  +                youngest = 0; //get ready for next run
  +                return duration <= 0 ? 1 : duration;
  +            }
  +        }
  +    }
  +
       /**
        * Controls certain log messages
        */
       private boolean isDebug = false;
   
       private SpoolRepository outgoing; // The spool of outgoing mail
  -    private long delayTime = 21600000; // default is 6*60*60*1000 millis (6 hours)
  +    private long[] delayTimes; //holds expanded delayTimes
       private int maxRetries = 5; // default number of retries
       private long smtpTimeout = 600000;  //default number of ms to timeout on smtp delivery
       private boolean sendPartial = false; // If false then ANY address errors will cause the transmission to fail
  @@ -145,14 +249,28 @@
       private MailServer mailServer;
       private volatile boolean destroyed = false; //Flag that the run method will check and end itself if set to true
   
  +    private Perl5Matcher delayTimeMatcher; //matcher use at init time to parse delaytime parameters
  +    private MultipleDelayFilter delayFilter = new MultipleDelayFilter ();//used by accept to selcet the next mail ready for processing
  +    
       /**
        * Initialize the mailet
        */
       public void init() throws MessagingException {
           isDebug = (getInitParameter("debug") == null) ? false : new Boolean(getInitParameter("debug")).booleanValue();
  +        ArrayList delay_times_list = new ArrayList();
           try {
               if (getInitParameter("delayTime") != null) {
  -                delayTime = Long.parseLong(getInitParameter("delayTime"));
  +                delayTimeMatcher = new Perl5Matcher();
  +                String delay_times = getInitParameter("delayTime");
  +                //split on comma's
  +                StringTokenizer st = new StringTokenizer (delay_times,",");
  +                while (st.hasMoreTokens()) {
  +                    String delay_time = st.nextToken();
  +                    delay_times_list.add (new Delay(delay_time));
  +                }
  +            } else {
  +                //use default delayTime.
  +                delay_times_list.add (new Delay());
               }
           } catch (Exception e) {
               log("Invalid delayTime setting: " + getInitParameter("delayTime"));
  @@ -161,8 +279,30 @@
               if (getInitParameter("maxRetries") != null) {
                   maxRetries = Integer.parseInt(getInitParameter("maxRetries"));
               }
  +            //check consistency with delay_times_list attempts
  +            int total_attempts = calcTotalAttempts (delay_times_list);
  +            if (total_attempts > maxRetries) {
  +                log("Total number of delayTime attempts exceeds maxRetries specified. Increasing maxRetries from "+maxRetries+" to "+total_attempts);
  +                maxRetries = total_attempts;
  +            } else {
  +                int extra = maxRetries - total_attempts;
  +                if (extra != 0) {
  +                    log("maxRetries is larger than total number of attempts specified. Increasing last delayTime with "+extra+" attempts ");
  +
  +                    if (delay_times_list.size() != 0) { 
  +                        Delay delay = (Delay)delay_times_list.get (delay_times_list.size()-1); //last Delay
  +                        delay.setAttempts (delay.getAttempts()+extra);
  +                        log("Delay of "+delay.getDelayTime()+" msecs is now attempted: "+delay.getAttempts()+" times");
  +                    } else {
  +                        log ("NO, delaytimes cannot continue");
  +                    }
  +                }
  +            }
  +            delayTimes = expandDelays (delay_times_list);
  +            
           } catch (Exception e) {
               log("Invalid maxRetries setting: " + getInitParameter("maxRetries"));
  +            e.printStackTrace();
           }
           try {
               if (getInitParameter("timeout") != null) {
  @@ -778,7 +918,8 @@
           try {
               while (!Thread.currentThread().interrupted() && !destroyed) {
                   try {
  -                    String key = outgoing.accept(delayTime);
  +                    MailImpl mail = (MailImpl)outgoing.accept(delayFilter);
  +                    String key = mail.getName();
                       try {
                           if (isDebug) {
                               StringBuffer logMessageBuffer =
  @@ -788,12 +929,6 @@
                                           .append(key);
                               log(logMessageBuffer.toString());
                           }
  -                        MailImpl mail = outgoing.retrieve(key);
  -                        // Retrieve can return null if the mail is no longer on the outgoing spool.
  -                        // In this case we simply continue to the next key
  -                        if (mail == null) {
  -                            continue;
  -                        }
                           if (deliver(mail, session)) {
                               //Message was successfully delivered/fully failed... delete it
                               outgoing.remove(key);
  @@ -818,4 +953,144 @@
               Thread.currentThread().interrupted();
           }
       }
  +
  +    /**
  +     * @param list holding Delay objects
  +     * @return the total attempts for all delays
  +     **/
  +    private int calcTotalAttempts (ArrayList list) {
  +        int sum = 0;
  +        Iterator i = list.iterator();
  +        while (i.hasNext()) {
  +            Delay delay = (Delay)i.next();
  +            sum += delay.getAttempts();
  +        }
  +        return sum;
  +    }
  +    
  +    /**
  +     * This method expands an ArrayList containing Delay objects into an array holding the
  +     * only delaytime in the order.<p>
  +     * So if the list has 2 Delay objects the first having attempts=2 and delaytime 4000
  +     * the second having attempts=1 and delaytime=300000 will be expanded into this array:<p>
  +     * long[0] = 4000<p>
  +     * long[1] = 4000<p>
  +     * long[2] = 300000<p>
  +     * @param list the list to expand
  +     * @return the expanded list
  +     **/
  +    private long[] expandDelays (ArrayList list) {
  +        long[] delays = new long [calcTotalAttempts(list)];
  +        Iterator i = list.iterator();
  +        int idx = 0;
  +        while (i.hasNext()) {
  +            Delay delay = (Delay)i.next();
  +            for (int j=0; j<delay.getAttempts(); j++) {
  +                delays[idx++]= delay.getDelayTime();
  +            }            
  +        }
  +        return delays;
  +    }
  +    
  +    /**
  +     * This method returns, given a retry-count, the next delay time to use.
  +     * @param retry_count the current retry_count.
  +     * @return the next delay time to use, given the retry count
  +     **/
  +    private long getNextDelay (int retry_count) {
  +        return delayTimes[retry_count-1];
  +    }
  +
  +    /**
  +     * This class is used to hold a delay time and its corresponding number
  +     * of retries.
  +     **/
  +    private class Delay {
  +        private int attempts = 1;
  +        private long delayTime = DEFAULT_DELAY_TIME;
  +        
  +            
  +        /**
  +         * This constructor expects Strings of the form "[attempt\*]delaytime[unit]". <p>
  +         * The optional attempt is the number of tries this delay should be used (default = 1)
  +         * The unit if present must be one of (msec,sec,minute,hour,day) (default = msec)
  +         * The constructor multiplies the delaytime by the relevant multiplier for the unit,
  +         * so the delayTime instance variable is always in msec.
  +         * @param init_string the string to initialize this Delay object from
  +         **/
  +        public Delay (String init_string) throws MessagingException
  +        {
  +            String unit = "msec"; //default unit
  +            if (delayTimeMatcher.matches (init_string, PATTERN)) {
  +                MatchResult res = delayTimeMatcher.getMatch ();
  +                //the capturing groups will now hold
  +                //at 1:  attempts * (if present)
  +                //at 2:  delaytime
  +                //at 3:  unit (if present)
  +                
  +                if (res.group(1) != null && !res.group(1).equals ("")) {
  +                    //we have an attempt *
  +                    String attempt_match = res.group(1);
  +                    //strip the * and whitespace
  +                    attempt_match = attempt_match.substring (0,attempt_match.length()-1).trim();
  +                    attempts = Integer.parseInt (attempt_match);
  +                }
  +                
  +                delayTime = Long.parseLong (res.group(2));
  +                
  +                if (!res.group(3).equals ("")) {
  +                    //we have a unit
  +                    unit = res.group(3).toLowerCase();
  +                }
  +            } else {
  +                throw new MessagingException(init_string+" does not match "+PATTERN_STRING);
  +            }
  +            if (MULTIPLIERS.get (unit)!=null) {
  +                int multiplier = ((Integer)MULTIPLIERS.get (unit)).intValue();
  +                delayTime *= multiplier;
  +            } else {
  +                throw new MessagingException("Unknown unit: "+unit);
  +            }
  +        }
  +
  +        /**
  +         * This constructor makes a default Delay object, ie. attempts=1 and delayTime=DEFAULT_DELAY_TIME
  +         **/
  +        public Delay () {
  +        }
  +
  +        /**
  +         * @return the delayTime for this Delay
  +         **/
  +        public long getDelayTime () {
  +            return delayTime;
  +        }
  +
  +        /**
  +         * @return the number attempts this Delay should be used.
  +         **/
  +        public int getAttempts () {
  +            return attempts;
  +        }
  +        
  +        /**
  +         * Set the number attempts this Delay should be used.
  +         **/
  +        public void setAttempts (int value) {
  +            attempts = value;
  +        }
  +        
  +        /**
  +         * Pretty prints this Delay 
  +         **/
  +        public String toString () {
  +            StringBuffer buf = new StringBuffer(15);
  +            buf.append (getAttempts ());
  +            buf.append ('*');
  +            buf.append (getDelayTime());
  +            buf.append ("msec");
  +            return buf.toString();
  +        }
  +    }
  +    
   }
  
  
  
  No                   revision
  No                   revision
  1.5.4.6   +16 -3     james-server/src/xdocs/provided_mailets_2_1.xml
  
  Index: provided_mailets_2_1.xml
  ===================================================================
  RCS file: /home/cvs/james-server/src/xdocs/provided_mailets_2_1.xml,v
  retrieving revision 1.5.4.5
  retrieving revision 1.5.4.6
  diff -u -r1.5.4.5 -r1.5.4.6
  --- provided_mailets_2_1.xml	29 May 2003 20:39:44 -0000	1.5.4.5
  +++ provided_mailets_2_1.xml	16 Nov 2003 21:47:24 -0000	1.5.4.6
  @@ -162,10 +162,23 @@
   <ul>
   <li><strong>outgoing</strong> (required) - The URL for the repository that will hold messages being processed
   by the RemoteDelivery Mailet.</li>
  -<li><strong>delayTime</strong> (optional) - a non-negative Long value that is the time in
  -milliseconds between redelivery attempts for a particular mail.  Defaults to six hours.</li>
  +<li><strong>delayTime</strong> (optional) - takes the form
  +&lt;attempts*delay quantifier&gt; where attempts and quantifier are both
  +optional an defaults to 1 and miliseconds respectively.
  +The delay is a non-negative value that is the time between redelivery
  +attempts for a particular mail. Defaults to six hours. Allowed
  +quantifiers are msec(s),sec(s),minute(s),hour(s),day(s). The attempts
  +is the number of times the specified delay is used. It is possible to
  +have multiple entries of this parameter, and if so the the delayTimes
  +are used in the order specified, this allows you to try a few times
  +in rapid succession and then move on to longer delays</li>
   <li><strong>maxRetries</strong> (optional) - a non-negative Integer value that is number of times
  -the Mailet will attempt to deliver a particular mail.  Defaults to five.</li>
  +the Mailet will attempt to deliver a particular mail.  If inconsistent
  +with number of attempts in <strong>delayTime</strong>
  +specifications. <strong>maxRetries<strong> will be increased if it is
  +less than the total number of attempts in <strong>delayTime</strong>,
  +if it is more the last specified <strong>delayTime</strong> is used
  +for the missing delays. Defaults to five. </li>
   <li><strong>timeout</strong> (optional) - The SMTP connection timeout for SMTP connections generated
   by this Mailet.  Defaults to 60 seconds.</li>
   <li><strong>deliveryThreads</strong> (optional) - The number of threads this Mailet will use to generate
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org