You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/02/22 15:44:03 UTC

svn commit: r912589 - in /james/server/trunk: spoolmanager/src/main/java/org/apache/james/transport/ spoolmanager/src/main/java/org/apache/james/transport/camel/ spring-deployment/src/main/config/james/

Author: norman
Date: Mon Feb 22 14:44:02 2010
New Revision: 912589

URL: http://svn.apache.org/viewvc?rev=912589&view=rev
Log:
Commit missing pieces for using Camel as SpoolManager/MailetProcessor replacement. James now use CAMEL with this configuration (JAMES-971)

Added:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java
Modified:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java
    james/server/trunk/spring-deployment/src/main/config/james/log4j.properties
    james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java Mon Feb 22 14:44:02 2010
@@ -21,8 +21,6 @@
 
 package org.apache.james.transport;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -47,7 +45,6 @@
 import org.apache.mailet.MailAddress;
 import org.apache.mailet.Mailet;
 import org.apache.mailet.MailetConfig;
-import org.apache.mailet.MailetException;
 import org.apache.mailet.Matcher;
 import org.apache.mailet.MatcherConfig;
 import org.apache.mailet.base.GenericMailet;
@@ -392,7 +389,7 @@
                     recipients = new ArrayList<MailAddress>(0);
                 } else if (recipients != mail.getRecipients()) {
                     //Make sure all the objects are MailAddress objects
-                    verifyMailAddresses(recipients);
+                    ProcessorUtil.verifyMailAddresses(recipients);
                 }
             } catch (MessagingException me) {
                 // look in the matcher's mailet's init attributes
@@ -410,7 +407,7 @@
                     recipients = mail.getRecipients();
                     // no need to verify addresses
                 } else {
-                    handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException);
+                    ProcessorUtil.handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException, logger);
                 }
             }
 
@@ -455,7 +452,7 @@
             try {
                 mailet.service(mail);
                 // Make sure all the recipients are still MailAddress objects
-                verifyMailAddresses(mail.getRecipients());
+                ProcessorUtil.verifyMailAddresses(mail.getRecipients());
             } catch (MessagingException me) {
                 MailetConfig mailetConfig = mailet.getMailetConfig();
                 String onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
@@ -467,9 +464,9 @@
                 if (onMailetException.compareTo("ignore") == 0) {
                     // ignore the exception and continue
                     // this option should not be used if the mail object can be changed by the mailet
-                    verifyMailAddresses(mail.getRecipients());
+                    ProcessorUtil.verifyMailAddresses(mail.getRecipients());
                 } else {
-                    handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException);
+                    ProcessorUtil.handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException, logger);
                 }
             }
 
@@ -503,64 +500,9 @@
     }
 
 
-    /**
-     * Checks that all objects in this class are of the form MailAddress.
-     *
-     * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
-     */
-    private void verifyMailAddresses(Collection<MailAddress> col) throws MessagingException {
-        try {
-            MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
-
-            // Why is this here?  According to the javadoc for
-            // java.util.Collection.toArray(Object[]), this should
-            // never happen.  The exception will be thrown.
-            if (addresses.length != col.size()) {
-                throw new MailetException("The recipient list contains objects other than MailAddress objects");
-            }
-        } catch (ArrayStoreException ase) {
-            throw new MailetException("The recipient list contains objects other than MailAddress objects");
-        }
-    }
+   
 
-    /**
-     * This is a helper method that updates the state of the mail object to
-     * Mail.ERROR as well as recording the exception to the log
-     *
-     * @param me the exception to be handled
-     * @param mail the mail being processed when the exception was generated
-     * @param offendersName the matcher or mailet than generated the exception
-     * @param nextState the next state to set
-     *
-     * @throws MessagingException thrown always, rethrowing the passed in exception
-     */
-    private void handleException(MessagingException me, Mail mail, String offendersName, String nextState) throws MessagingException {
-        System.err.println("exception! " + me);
-        mail.setState(nextState);
-        StringWriter sout = new StringWriter();
-        PrintWriter out = new PrintWriter(sout, true);
-        StringBuffer exceptionBuffer =
-            new StringBuffer(128)
-                    .append("Exception calling ")
-                    .append(offendersName)
-                    .append(": ")
-                    .append(me.getMessage());
-        out.println(exceptionBuffer.toString());
-        Exception e = me;
-        while (e != null) {
-            e.printStackTrace(out);
-            if (e instanceof MessagingException) {
-                e = ((MessagingException)e).getNextException();
-            } else {
-                e = null;
-            }
-        }
-        String errorString = sout.toString();
-        mail.setErrorMessage(errorString);
-        logger.error(errorString);
-        throw me;
-    }
-    
+  
     /**
      * <p>Initialize the processor matcher/mailet list.</p>
      */

Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java?rev=912589&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java Mon Feb 22 14:44:02 2010
@@ -0,0 +1,93 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+
+package org.apache.james.transport;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collection;
+
+import javax.mail.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailAddress;
+import org.apache.mailet.MailetException;
+
+public class ProcessorUtil {
+
+    /**
+     * This is a helper method that updates the state of the mail object to
+     * Mail.ERROR as well as recording the exception to the log
+     *
+     * @param me the exception to be handled
+     * @param mail the mail being processed when the exception was generated
+     * @param offendersName the matcher or mailet than generated the exception
+     * @param nextState the next state to set
+     *
+     * @throws MessagingException thrown always, rethrowing the passed in exception
+     */
+    public static void handleException(MessagingException me, Mail mail, String offendersName, String nextState, Log logger) throws MessagingException {
+        System.err.println("exception! " + me);
+        mail.setState(nextState);
+        StringWriter sout = new StringWriter();
+        PrintWriter out = new PrintWriter(sout, true);
+        StringBuffer exceptionBuffer =
+            new StringBuffer(128)
+                    .append("Exception calling ")
+                    .append(offendersName)
+                    .append(": ")
+                    .append(me.getMessage());
+        out.println(exceptionBuffer.toString());
+        Exception e = me;
+        while (e != null) {
+            e.printStackTrace(out);
+            if (e instanceof MessagingException) {
+                e = ((MessagingException)e).getNextException();
+            } else {
+                e = null;
+            }
+        }
+        String errorString = sout.toString();
+        mail.setErrorMessage(errorString);
+        logger.error(errorString);
+        throw me;
+    }
+    
+    /**
+     * Checks that all objects in this class are of the form MailAddress.
+     *
+     * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
+     */
+    public static void verifyMailAddresses(Collection<MailAddress> col) throws MessagingException {
+        try {
+            MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
+
+            // Why is this here?  According to the javadoc for
+            // java.util.Collection.toArray(Object[]), this should
+            // never happen.  The exception will be thrown.
+            if (addresses.length != col.size()) {
+                throw new MailetException("The recipient list contains objects other than MailAddress objects");
+            }
+        } catch (ArrayStoreException ase) {
+            throw new MailetException("The recipient list contains objects other than MailAddress objects");
+        }
+    }
+}

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java Mon Feb 22 14:44:02 2010
@@ -30,6 +30,12 @@
     public final static String STATE = "mailstate";
 
     private Mail mail;
+    
+    /**
+     * 
+     * @param mail the mail message to use as Body of the message
+     * 
+     */
     public MailMessage(Mail mail) {
         this.mail = mail;
     }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java Mon Feb 22 14:44:02 2010
@@ -20,13 +20,16 @@
 package org.apache.james.transport.camel;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import javax.mail.MessagingException;
 
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.model.ChoiceDefinition;
 import org.apache.commons.configuration.ConfigurationException;
@@ -34,27 +37,37 @@
 import org.apache.commons.logging.Log;
 import org.apache.james.lifecycle.Configurable;
 import org.apache.james.lifecycle.LogEnabled;
+import org.apache.james.services.SpoolManager;
+import org.apache.james.transport.MailetConfigImpl;
 import org.apache.james.transport.MailetLoader;
+import org.apache.james.transport.MatcherConfigImpl;
 import org.apache.james.transport.MatcherLoader;
+import org.apache.mailet.Mail;
 import org.apache.mailet.Mailet;
+import org.apache.mailet.MailetConfig;
 import org.apache.mailet.Matcher;
+import org.apache.mailet.MatcherConfig;
+import org.apache.mailet.base.GenericMailet;
 import org.apache.mailet.base.MatcherInverter;
 
 /**
  * Build up the Camel Route by parsing the spoolmanager.xml configuration file. 
  * 
- * @author norman
- *
+ * TODO:  - Limit Threads
  */
-public class MailProcessorRouteBuilder extends RouteBuilder implements Configurable, LogEnabled {
+public class MailProcessorRouteBuilder extends RouteBuilder implements SpoolManager, Configurable, LogEnabled {
 
     private MatcherLoader matcherLoader;
     private HierarchicalConfiguration config;
     private MailetLoader mailetLoader;
     private Log logger;
 
-    private List<Mailet> mailets = new ArrayList<Mailet>();
+    private final Map<String,List<Mailet>> mailets = new HashMap<String,List<Mailet>>();
+    private final Map<String,List<Matcher>> matchers = new HashMap<String,List<Matcher>>();
+
+    private final List<String> processors = new ArrayList<String>();
     
+       
     @Resource(name = "matcherpackages")
     public void setMatcherLoader(MatcherLoader matcherLoader) {
         this.matcherLoader = matcherLoader;
@@ -73,16 +86,26 @@
     @Override
     public void configure() throws Exception {
 
-        
-        ChoiceDefinition spoolDef = from("spool://spoolRepository").choice();
+        Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
+
+        // start to consume from the spool
+        ChoiceDefinition spoolDef = from("spool://spoolRepository")
+       
+        // start first choice
+        .choice();
         
         List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
         for (int i = 0; i < processorConfs.size(); i++) {
             final HierarchicalConfiguration processorConf = processorConfs.get(i);
             String processorName = processorConf.getString("[@name]");
 
+            
+            processors.add(processorName);
+            mailets.put(processorName, new ArrayList<Mailet>());
+            matchers.put(processorName, new ArrayList<Matcher>());
+
             // Check which route we need to go
-            ChoiceDefinition processorDef = spoolDef.when(header(MailMessage.STATE).isEqualTo(processorName)).setHeader("currentProcessor", constant(processorName));
+            ChoiceDefinition processorDef = spoolDef.when(header(MailMessage.STATE).isEqualTo(processorName));
             
             final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
             // Loop through the mailet configuration, load
@@ -152,42 +175,113 @@
                     throw new ConfigurationException("Unable to init mailet", ex);
                 }
                 if (mailet != null && matcher != null) {
-                    // Store the matcher to use for splitter
-                    processorDef.setHeader(MatcherSplitter.MATCHER_HEADER, constant(matcher))
-                        // do splitting
-                        .split().method(MatcherSplitter.class)
-                            // check if we need to execute the mailet and remove headers
-                            .choice().when(header(MatcherSplitter.MATCHER_MATCHED_HEADER).isEqualTo(true)).process(new MailetProcessor(mailet)).removeHeader(MatcherSplitter.MATCHER_MATCHED_HEADER)
+                    String onMatchException = null;
+                    MailetConfig mailetConfig = mailet.getMailetConfig();
+                    
+                    if (mailetConfig instanceof MailetConfigImpl) {
+                        onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
+                    }
+                    
+                    // Store the matcher to use for splitter in properties
+                    processorDef.setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher))
+                    
+                            // store the config in properties
+                            .setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
+                            
+                            // store the logger in properties
+                            .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger))
+
+                            // do splitting of the mail based on the stored matcher
+                            .split().method(MatcherSplitter.class)
+                            
+                            // start first choice
+                            .choice()
+                            
+                            // check if the state of the mail is the same as the
+                            // current processor. if not we have can just store it to spool an stop the route processing
+                            .when(header(MailMessage.STATE).isNotEqualTo(processorName)).to("spool://spoolRepository").stop()
+                            
+                            // if not continue the route
+                            .otherwise()
+                            
+                            // start second choice
+                            .choice()
+                            
+                            // check if we need to execute the mailet. If so execute it and remove the header on the end
+                            .when(header(MatcherSplitter.MATCHER_MATCHED_HEADER).isEqualTo("true")).process(new MailetProcessor(mailet, logger)).removeHeader(MatcherSplitter.MATCHER_MATCHED_HEADER)
+                            
+                            // end second choice
+                            .end()
+                            
+                            // end first choice
                             .end()
-                        .end()
-                    // remove matcher from header
-                    .removeHeader(MatcherSplitter.MATCHER_HEADER);
+                            
+                            // remove matcher from properties
+                            .removeProperty(MatcherSplitter.MATCHER_PROPERTY)
+
+                            // remove config from properties
+                            .removeProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY)
                     
-                    // store mailet for later destroy on shutdown
-                    mailets.add(mailet);
+                            // remove logger from properties
+                            .removeProperty(MatcherSplitter.LOGGER_PROPERTY);
+
+                    // store mailet and matcher
+                    mailets.get(processorName).add(mailet);
+                    matchers.get(processorName).add(matcher);
                 }
               
 
             }
-            processorDef.to("spool://spoolRepository");
+            
+            processorDef
+                    // start choice
+                    .choice()
+                    
+                    // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
+                    // make sure we don't fall into a endless loop
+                    .when(header(MailMessage.STATE).isEqualTo(processorName)).process(terminatingMailetProcessor)
+                    
+                    // end the choice
+                    .end()
+                    
+                    // route everything to the spool now
+                    .to("spool://spoolRepository");
         }
-
-        // just use a mock for now
-        spoolDef.end();
         
     }
 
+    /**
+     * Destroy all mailets and matchers
+     */
     @PreDestroy
     public void dispose() {
-        Iterator<Mailet> it = mailets.iterator();
         boolean debugEnabled = logger.isDebugEnabled();
+
+        Iterator<List<Mailet>> it = mailets.values().iterator();
         while (it.hasNext()) {
-            Mailet mailet = it.next();
-            if (debugEnabled) {
-                logger.debug("Shutdown mailet " + mailet.getMailetInfo());
+            List<Mailet> mList = it.next();
+            for (int i = 0; i < mList.size(); i++) {
+                Mailet m = mList.get(i);
+                if (debugEnabled) {
+                    logger.debug("Shutdown mailet " + m.getMailetInfo());
+                }
+                m.destroy();
             }
-            mailet.destroy();
+           
         }
+        
+        Iterator<List<Matcher>> mit = matchers.values().iterator();     
+        while (mit.hasNext()) {
+            List<Matcher> mList = mit.next();
+            for (int i = 0; i < mList.size(); i++) {
+                Matcher m = mList.get(i);
+                if (debugEnabled) {
+                    logger.debug("Shutdown matcher " + m.getMatcherInfo());
+                }
+                m.destroy();
+            }
+           
+        }      
     }
     
     /*
@@ -207,4 +301,91 @@
 
     }
 
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.services.SpoolManager#getMailetConfigs(java.lang.String)
+     */
+    public List<MailetConfig> getMailetConfigs(String processorName) {
+        List<MailetConfig> mailetConfigs = new ArrayList<MailetConfig>();
+        Iterator<Mailet> iterator = mailets.get(processorName).iterator();
+        while (iterator.hasNext()) {
+            Mailet mailet = (Mailet) iterator.next();
+            MailetConfig mailetConfig = mailet.getMailetConfig();
+            if (mailetConfig == null) mailetConfigs.add(new MailetConfigImpl()); // placeholder
+            else mailetConfigs.add(mailetConfig);
+        }
+        return mailetConfigs; 
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.services.SpoolManager#getMatcherConfigs(java.lang.String)
+     */
+    public List<MatcherConfig> getMatcherConfigs(String processorName) {
+        List<MatcherConfig> matcherConfigs = new ArrayList<MatcherConfig>();
+        Iterator<Matcher> iterator = matchers.get(processorName).iterator();
+        while (iterator.hasNext()) {
+            Matcher matcher = (Matcher) iterator.next();
+            MatcherConfig matcherConfig = matcher.getMatcherConfig();
+            if (matcherConfig == null) matcherConfigs.add(new MatcherConfigImpl()); // placeholder
+            else matcherConfigs.add(matcherConfig);
+        }      
+        return matcherConfigs;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.services.SpoolManager#getProcessorNames()
+     */
+    public String[] getProcessorNames() {
+        return processors.toArray(new String[processors.size()]);
+    }
+
+    
+    /**
+     * Mailet which protect us to not fall into an endless loop caused by an configuration error
+     *
+     */
+    private final class TerminatingMailet extends GenericMailet {
+        /**
+         *  The name of the mailet used to terminate the mailet chain.  The
+         *  end of the matcher/mailet chain must be a matcher that matches
+         *  all mails and a mailet that sets every mail to GHOST status.
+         *  This is necessary to ensure that mails are removed from the spool
+         *  in an orderly fashion.
+         */
+        private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
+
+        /*
+         * (non-Javadoc)
+         * @see org.apache.mailet.base.GenericMailet#service(org.apache.mailet.Mail)
+         */
+        public void service(Mail mail) {
+            if (!(Mail.ERROR.equals(mail.getState()))) {
+                // Don't complain if we fall off the end of the
+                // error processor.  That is currently the
+                // normal situation for James, and the message
+                // will show up in the error store.
+                StringBuffer warnBuffer = new StringBuffer(256)
+                                      .append("Message ")
+                                      .append(mail.getName())
+                                      .append(" reached the end of this processor, and is automatically deleted.  This may indicate a configuration error.");
+                logger.warn(warnBuffer.toString());
+            }
+            
+            // Set the mail to ghost state
+            mail.setState(Mail.GHOST);
+        }
+    
+        @Override
+        public String getMailetInfo() {
+            return getMailetName();
+        }
+    
+        @Override
+        public String getMailetName() {
+            return TERMINATING_MAILET_NAME;
+        }
+    }
+
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java Mon Feb 22 14:44:02 2010
@@ -1,9 +1,35 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
 package org.apache.james.transport.camel;
 
+import java.util.Locale;
+
+import javax.mail.MessagingException;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.james.transport.MailetConfigImpl;
+import org.apache.james.transport.ProcessorUtil;
 import org.apache.mailet.Mail;
 import org.apache.mailet.Mailet;
+import org.apache.mailet.MailetConfig;
 
 /**
  * Mailet wrapper which execute a Mailet in a Processor
@@ -12,23 +38,47 @@
 public class MailetProcessor implements Processor{
 
     private Mailet mailet;
+    private Log logger;
    
-    public MailetProcessor(Mailet mailet) {
+    /**
+     * Mailet to call on process
+     * 
+     * @param mailet
+     */
+    public MailetProcessor(Mailet mailet, Log logger) {
         this.mailet = mailet;
+        this.logger = logger;
     }
     
-    /*
-     * (non-Javadoc)
-     * @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
+    /**
+     * Call the wrapped mailet for the exchange
      */
+    @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws Exception {
+        Mail mail = (Mail) exchange.getIn().getBody();
         try {
-            mailet.service((Mail)exchange.getIn().getBody());
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw e;
+            mailet.service(mail);
+        } catch (MessagingException me) {
+            String onMailetException = null;
+            
+            MailetConfig mailetConfig = mailet.getMailetConfig();
+            if (mailetConfig instanceof MailetConfigImpl) {
+                onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
+            }
+            if (onMailetException == null) {
+                onMailetException = Mail.ERROR;
+            } else {
+                onMailetException = onMailetException.trim().toLowerCase(Locale.US);
+            }
+            if (onMailetException.compareTo("ignore") == 0) {
+                // ignore the exception and continue
+                // this option should not be used if the mail object can be
+                // changed by the mailet
+                ProcessorUtil.verifyMailAddresses(mail.getRecipients());
+            } else {
+                ProcessorUtil.handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException, logger);
+            }
         }
-                
     }
 
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java Mon Feb 22 14:44:02 2010
@@ -23,13 +23,16 @@
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 
 import javax.mail.MessagingException;
 
 import org.apache.camel.Body;
-import org.apache.camel.Header;
 import org.apache.camel.InOnly;
+import org.apache.camel.Property;
+import org.apache.commons.logging.Log;
 import org.apache.james.core.MailImpl;
+import org.apache.james.transport.ProcessorUtil;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
 import org.apache.mailet.Matcher;
@@ -43,10 +46,21 @@
 @InOnly
 public class MatcherSplitter {
     
+    /**
+     * Headername which is used to indicate that the matcher matched
+     */
     public final static String MATCHER_MATCHED_HEADER = "matched";
     
-    public final static String MATCHER_HEADER = "matcher";
+    /**
+     * Headername under which the matcher is stored
+     */
+    public final static String MATCHER_PROPERTY = "matcher";
 
+    public final static String ON_MATCH_EXCEPTION_PROPERTY = "onMatchException";
+    
+    public final static String LOGGER_PROPERTY = "logger";
+
+    
     /**
      * Generate a List of MailMessage instances for the give @Body. This is done by using the given Matcher to see if we need more then one instance of the 
      * MailMessage
@@ -57,44 +71,77 @@
      * @throws MessagingException
      */
     @SuppressWarnings("unchecked")
-    public List<MailMessage> split(@Header(MATCHER_HEADER) Matcher matcher,@Body Mail mail) throws MessagingException {
+    public List<MailMessage> split(@Property(MATCHER_PROPERTY) Matcher matcher, @Property(ON_MATCH_EXCEPTION_PROPERTY) String onMatchException, @Property(LOGGER_PROPERTY) Log logger, @Body Mail mail) throws MessagingException {
         List<MailMessage> mails = new ArrayList<MailMessage>();
         boolean fullMatch = false;
         
+        // call the matcher
+        Collection<MailAddress> matchedRcpts = null;
         
-        Collection<MailAddress> matchedRcpts = matcher.match(mail);
+        try {
+            matchedRcpts = matcher.match(mail);
+            if (matchedRcpts == null) {
+                //In case the matcher returned null, create an empty Collection
+                matchedRcpts = new ArrayList<MailAddress>(0);
+            } else if (matchedRcpts != mail.getRecipients()) {
+                //Make sure all the objects are MailAddress objects
+                ProcessorUtil.verifyMailAddresses(matchedRcpts);
+            }
+        } catch (MessagingException me) {
+        
+            if (onMatchException == null) {
+                onMatchException = Mail.ERROR;
+            } else {
+                onMatchException = onMatchException.trim().toLowerCase(Locale.US);
+            }
+            if (onMatchException.compareTo("nomatch") == 0) {
+                //In case the matcher returned null, create an empty Collection
+                matchedRcpts = new ArrayList<MailAddress>(0);
+            } else if (onMatchException.compareTo("matchall") == 0) {
+                matchedRcpts = mail.getRecipients();
+                // no need to verify addresses
+            } else {
+                ProcessorUtil.handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException, logger);
+            }
+        }
         
         // check if the matcher matched
-        if (matchedRcpts != null &&  matchedRcpts.isEmpty() == false) {
+        if ( matchedRcpts != null && matchedRcpts.isEmpty() == false) {
+            List<MailAddress> rcpts = new ArrayList<MailAddress>(mail.getRecipients());
+            
+            Iterator<MailAddress> rcptsIterator = matchedRcpts.iterator();
+            
+            while(rcptsIterator.hasNext()) {
+                // loop through the recipients and remove the recipiends that matched
+                rcpts.remove(rcptsIterator.next());
+            }
             
-            // check if we need to create another instance of the mail. This is only needed if not all
-            // recipients matched 
-            if (matchedRcpts.equals(mail.getRecipients()) == false) {
+            if (rcpts.isEmpty()) {
+                // all recipients matched
+                fullMatch = true;
+            } else {
+                mail.setRecipients(rcpts);
+                
                 Mail newMail = new MailImpl(mail);
                 newMail.setRecipients(matchedRcpts);
-            
-                MailMessage newmsg = new MailMessage(newMail);
                 
+                MailMessage newmsg = new MailMessage(newMail);
+                    
                 // Set a header because the matcher matched. This can be used later when processing the route
                 newmsg.setHeader(MATCHER_MATCHED_HEADER, true);
+                
+                // add the new generated mail to the mails list
                 mails.add(newmsg);
-            
-                List<MailAddress> rcpts = new ArrayList<MailAddress>(mail.getRecipients());
-                Iterator<MailAddress> rcptsIterator = newMail.getRecipients().iterator();
-                while(rcptsIterator.hasNext()) {
-                    rcpts.remove(rcptsIterator.next());
-                }
-                mail.setRecipients(rcpts);
-            } else {
-                // all recipients matched
-                fullMatch = true;
             }
         }
+            
         MailMessage mailMsg = new MailMessage(mail);
         if (fullMatch) {
             // Set a header because the matcher matched. This can be used later when processing the route
             mailMsg.setHeader(MATCHER_MATCHED_HEADER, true);
         }
+        
+        // add mailMsg to the mails list
         mails.add(mailMsg);
         
         return mails;

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java Mon Feb 22 14:44:02 2010
@@ -19,7 +19,6 @@
 
 package org.apache.james.transport.camel;
 
-import java.util.List;
 import java.util.Map;
 
 import javax.annotation.Resource;
@@ -28,12 +27,14 @@
 import org.apache.camel.impl.DefaultComponent;
 import org.apache.commons.logging.Log;
 import org.apache.james.lifecycle.LogEnabled;
-import org.apache.james.services.SpoolManager;
 import org.apache.james.services.SpoolRepository;
-import org.apache.mailet.MailetConfig;
-import org.apache.mailet.MatcherConfig;
 
-public class SpoolComponent extends DefaultComponent implements SpoolManager, LogEnabled {
+/**
+ * Component which acts as SpoolEndPoint factory
+ * 
+ *
+ */
+public class SpoolComponent extends DefaultComponent implements LogEnabled {
 
     private SpoolRepository spool;
     private Log log;
@@ -49,21 +50,6 @@
         return new SpoolEndPoint(uri,this, spool, log);
     }
 
-    public List<MailetConfig> getMailetConfigs(String processorName) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    public List<MatcherConfig> getMatcherConfigs(String processorName) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    public String[] getProcessorNames() {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
     /*
      * (non-Javadoc)
      * @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java Mon Feb 22 14:44:02 2010
@@ -19,6 +19,10 @@
 
 package org.apache.james.transport.camel;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
@@ -27,12 +31,19 @@
 import org.apache.james.services.SpoolRepository;
 import org.apache.mailet.Mail;
 
+/**
+ * Consumer implementation which consum Mail objects of the SpoolRepository
+ * 
+ *
+ */
 public class SpoolConsumer extends DefaultConsumer{
 
 
     private SpoolRepository spool;
     private Processor processor;
     private Log log;
+    private ExecutorService executor = Executors.newSingleThreadExecutor();
+    private Future<?> f;
     
     public SpoolConsumer(DefaultEndpoint endpoint, Processor processor, SpoolRepository spool, Log log) {
         super(endpoint, processor);
@@ -42,18 +53,21 @@
     }
 
 
-    public Exchange receive() {
+    /**
+     * Receive a Mail object of the SpoolRepository when one is ready. This method will block until a Mail object is ready to process
+     * 
+     * @return exchange exchange which holds the MailMessage build up on the Mail
+     */
+    private Exchange receive() {
         Exchange ex = getEndpoint().createExchange();        
         Mail mail;
         try {
             mail = spool.accept();
             ex.setIn(new MailMessage(mail));
+            log.debug("Mail " + mail.toString() + " ready for process. Start to process exchange " +ex);
             processor.process(ex);
         } catch (Exception e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
             ex.setException(e);
-            
         }
         return ex;
 
@@ -63,7 +77,7 @@
     @Override
     protected void doStart() throws Exception {
         super.doStart();
-        Thread t = new Thread(new Runnable() {
+        f  = executor.submit(new Runnable() {
 
             public void run() {
                 while(true) {
@@ -72,15 +86,17 @@
             }
             
         });
-        t.setDaemon(true);
-        t.start();
     }
 
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
-        
+        try {
+            f.cancel(true);
+        } catch (Exception e) {
+            // ignore
+        }
     }
     
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java Mon Feb 22 14:44:02 2010
@@ -26,6 +26,10 @@
 import org.apache.commons.logging.Log;
 import org.apache.james.services.SpoolRepository;
 
+/**
+ * Endpoint which handles Producer and Consumer for SpoolRepositories
+ *
+ */
 public class SpoolEndPoint extends DefaultEndpoint{
 
     private SpoolRepository spool;
@@ -61,9 +65,7 @@
      * @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
      */
     public Consumer createConsumer(Processor processor) throws Exception {
-        SpoolConsumer consumer = new SpoolConsumer(this,processor,spool,log);
-        return consumer;
-        
+        return new SpoolConsumer(this,processor,spool,log);        
     }
 
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java Mon Feb 22 14:44:02 2010
@@ -27,6 +27,11 @@
 import org.apache.james.services.SpoolRepository;
 import org.apache.mailet.Mail;
 
+/**
+ * Producer which handles store and remove of Mails. If a Mail is has state Mail.GHOST or the recipients are size of 0 it will get removed from the spool, otherwise it will get stored 
+ * back to the spool
+ *
+ */
 public class SpoolProducer extends DefaultProducer {
 
     private SpoolRepository spool;
@@ -44,7 +49,6 @@
      * @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
      */
     public void process(Exchange exchange) throws Exception {
-        // Exchange newExchange = getEndpoint().createExchange(exchange);
         Mail mail = (Mail) exchange.getIn().getBody();
 
         // Only remove an email from the spool is processing is
@@ -55,11 +59,12 @@
                 StringBuffer debugBuffer = new StringBuffer(64).append("==== Removed from spool mail ").append(mail.getName()).append("====");
                 log.debug(debugBuffer.toString());
             }
+            // Dispose mail 
             LifecycleUtil.dispose(mail);
 
         } else {
-
             try {
+                // store mail back to spool
                 spool.store(mail);
             } catch (Exception e) {
                 e.printStackTrace();

Modified: james/server/trunk/spring-deployment/src/main/config/james/log4j.properties
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/log4j.properties?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/log4j.properties (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/log4j.properties Mon Feb 22 14:44:02 2010
@@ -129,7 +129,8 @@
 
 
 
-
+# logger for camel
+log4j.logger.org.apache.camel=WARN, CONS, FILE
 log4j.logger.org.springframework=WARN, CONS, FILE
 log4j.logger.org.apache.james=DEBUG, CONS, FILE
 #log4j.logger.james: set from default value WARN to INFO or even DEBUG to see (even) more logging

Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Mon Feb 22 14:44:02 2010
@@ -1,5 +1,4 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN 2.0//EN" "http://www.springframework.org/dtd/spring-beans-2.0.dtd">
 
 	<!--
 		! Licensed to the Apache Software Foundation (ASF) under one ! ! or
@@ -17,7 +16,13 @@
 	-->
 
 
-<beans>
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:camel="http://camel.apache.org/schema/spring"
+       xsi:schemaLocation="
+          http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
 
 	<!--
 		** JMX part ** to enable exposure of JMX, activate the following beans
@@ -76,7 +81,7 @@
 			    <entry key="smtpProtocolHandlerChain" value="smtpserver"/>
 				<entry key="pop3ProtocolHandlerChain" value="pop3server"/>
 				<entry key="remoteProtocolHandlerChain" value="remotemanager"/>		
-				<entry key="mailProcessor" value="spoolmanager"/>
+				<entry key="spool" value="spoolmanager"/>
 	        </map>
 		</property>
 	</bean>
@@ -96,7 +101,7 @@
 				<entry key="smtpProtocolHandlerChain" value="smtpserver"/>
 				<entry key="pop3ProtocolHandlerChain" value="pop3server"/>
 				<entry key="remoteProtocolHandlerChain" value="remoteManager"/>
-				<entry key="mailProcessor" value="spoolmanager"/>
+				<entry key="spool" value="spoolmanager"/>				
 			</map>
 		</property>
 	</bean>
@@ -107,22 +112,33 @@
 
 	<bean id="instanceFactory" class="org.apache.james.container.spring.SpringInstanceFactory"/>
 	
-
     <!-- Add support for Persistence annotations -->
     <bean class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>
 
+    <!-- Change trace to true for debugging purposes -->
+    <camelContext id="jamesCamelContext" xmlns="http://camel.apache.org/schema/spring" trace="false">
+        <routeBuilder ref="processorRoute" />    
+    </camelContext>
+  
+    <!-- Build the camelroute from the spoolmanager.xml -->
+    <bean id="spoolmanager" name="processorRoute" class="org.apache.james.transport.camel.MailProcessorRouteBuilder"/>
+
+    <!-- The component for the spool which handles consuming and producing -->    
+	<bean id="spool" class="org.apache.james.transport.camel.SpoolComponent" />
+
 	<bean id="James" class="org.apache.james.James"/>
 	
 	<bean id="mailetcontext" class="org.apache.james.JamesMailetContext"/>
 
 	<!-- The James Spool Manager block  -->
+	<!-- 
 	<bean id="spoolmanager" class="org.apache.james.transport.JamesSpoolManager" />
 	
 	<bean id="mailProcessor" class="org.apache.james.container.spring.StateAwareProcessorList">
     	<property name="logRegistry" ref="logRegistry"/>
 	    <property name="configurationRegistry" ref="configurationRegistry"/>
 	</bean>
-
+    -->
 	<bean id="matcherpackages" class="org.apache.james.transport.JamesMatcherLoader" />
 
 	<bean id="mailetpackages" class="org.apache.james.transport.JamesMailetLoader" />



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org