You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/02/22 15:44:03 UTC
svn commit: r912589 - in /james/server/trunk:
spoolmanager/src/main/java/org/apache/james/transport/
spoolmanager/src/main/java/org/apache/james/transport/camel/
spring-deployment/src/main/config/james/
Author: norman
Date: Mon Feb 22 14:44:02 2010
New Revision: 912589
URL: http://svn.apache.org/viewvc?rev=912589&view=rev
Log:
Commit missing pieces for using Camel as SpoolManager/MailetProcessor replacement. James now use CAMEL with this configuration (JAMES-971)
Added:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java
james/server/trunk/spring-deployment/src/main/config/james/log4j.properties
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/LinearProcessor.java Mon Feb 22 14:44:02 2010
@@ -21,8 +21,6 @@
package org.apache.james.transport;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
@@ -47,7 +45,6 @@
import org.apache.mailet.MailAddress;
import org.apache.mailet.Mailet;
import org.apache.mailet.MailetConfig;
-import org.apache.mailet.MailetException;
import org.apache.mailet.Matcher;
import org.apache.mailet.MatcherConfig;
import org.apache.mailet.base.GenericMailet;
@@ -392,7 +389,7 @@
recipients = new ArrayList<MailAddress>(0);
} else if (recipients != mail.getRecipients()) {
//Make sure all the objects are MailAddress objects
- verifyMailAddresses(recipients);
+ ProcessorUtil.verifyMailAddresses(recipients);
}
} catch (MessagingException me) {
// look in the matcher's mailet's init attributes
@@ -410,7 +407,7 @@
recipients = mail.getRecipients();
// no need to verify addresses
} else {
- handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException);
+ ProcessorUtil.handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException, logger);
}
}
@@ -455,7 +452,7 @@
try {
mailet.service(mail);
// Make sure all the recipients are still MailAddress objects
- verifyMailAddresses(mail.getRecipients());
+ ProcessorUtil.verifyMailAddresses(mail.getRecipients());
} catch (MessagingException me) {
MailetConfig mailetConfig = mailet.getMailetConfig();
String onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
@@ -467,9 +464,9 @@
if (onMailetException.compareTo("ignore") == 0) {
// ignore the exception and continue
// this option should not be used if the mail object can be changed by the mailet
- verifyMailAddresses(mail.getRecipients());
+ ProcessorUtil.verifyMailAddresses(mail.getRecipients());
} else {
- handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException);
+ ProcessorUtil.handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException, logger);
}
}
@@ -503,64 +500,9 @@
}
- /**
- * Checks that all objects in this class are of the form MailAddress.
- *
- * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
- */
- private void verifyMailAddresses(Collection<MailAddress> col) throws MessagingException {
- try {
- MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
-
- // Why is this here? According to the javadoc for
- // java.util.Collection.toArray(Object[]), this should
- // never happen. The exception will be thrown.
- if (addresses.length != col.size()) {
- throw new MailetException("The recipient list contains objects other than MailAddress objects");
- }
- } catch (ArrayStoreException ase) {
- throw new MailetException("The recipient list contains objects other than MailAddress objects");
- }
- }
+
- /**
- * This is a helper method that updates the state of the mail object to
- * Mail.ERROR as well as recording the exception to the log
- *
- * @param me the exception to be handled
- * @param mail the mail being processed when the exception was generated
- * @param offendersName the matcher or mailet than generated the exception
- * @param nextState the next state to set
- *
- * @throws MessagingException thrown always, rethrowing the passed in exception
- */
- private void handleException(MessagingException me, Mail mail, String offendersName, String nextState) throws MessagingException {
- System.err.println("exception! " + me);
- mail.setState(nextState);
- StringWriter sout = new StringWriter();
- PrintWriter out = new PrintWriter(sout, true);
- StringBuffer exceptionBuffer =
- new StringBuffer(128)
- .append("Exception calling ")
- .append(offendersName)
- .append(": ")
- .append(me.getMessage());
- out.println(exceptionBuffer.toString());
- Exception e = me;
- while (e != null) {
- e.printStackTrace(out);
- if (e instanceof MessagingException) {
- e = ((MessagingException)e).getNextException();
- } else {
- e = null;
- }
- }
- String errorString = sout.toString();
- mail.setErrorMessage(errorString);
- logger.error(errorString);
- throw me;
- }
-
+
/**
* <p>Initialize the processor matcher/mailet list.</p>
*/
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java?rev=912589&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/ProcessorUtil.java Mon Feb 22 14:44:02 2010
@@ -0,0 +1,93 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+
+package org.apache.james.transport;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collection;
+
+import javax.mail.MessagingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.mailet.Mail;
+import org.apache.mailet.MailAddress;
+import org.apache.mailet.MailetException;
+
+public class ProcessorUtil {
+
+ /**
+ * This is a helper method that updates the state of the mail object to
+ * Mail.ERROR as well as recording the exception to the log
+ *
+ * @param me the exception to be handled
+ * @param mail the mail being processed when the exception was generated
+ * @param offendersName the matcher or mailet than generated the exception
+ * @param nextState the next state to set
+ *
+ * @throws MessagingException thrown always, rethrowing the passed in exception
+ */
+ public static void handleException(MessagingException me, Mail mail, String offendersName, String nextState, Log logger) throws MessagingException {
+ System.err.println("exception! " + me);
+ mail.setState(nextState);
+ StringWriter sout = new StringWriter();
+ PrintWriter out = new PrintWriter(sout, true);
+ StringBuffer exceptionBuffer =
+ new StringBuffer(128)
+ .append("Exception calling ")
+ .append(offendersName)
+ .append(": ")
+ .append(me.getMessage());
+ out.println(exceptionBuffer.toString());
+ Exception e = me;
+ while (e != null) {
+ e.printStackTrace(out);
+ if (e instanceof MessagingException) {
+ e = ((MessagingException)e).getNextException();
+ } else {
+ e = null;
+ }
+ }
+ String errorString = sout.toString();
+ mail.setErrorMessage(errorString);
+ logger.error(errorString);
+ throw me;
+ }
+
+ /**
+ * Checks that all objects in this class are of the form MailAddress.
+ *
+ * @throws MessagingException when the <code>Collection</code> contains objects that are not <code>MailAddress</code> objects
+ */
+ public static void verifyMailAddresses(Collection<MailAddress> col) throws MessagingException {
+ try {
+ MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
+
+ // Why is this here? According to the javadoc for
+ // java.util.Collection.toArray(Object[]), this should
+ // never happen. The exception will be thrown.
+ if (addresses.length != col.size()) {
+ throw new MailetException("The recipient list contains objects other than MailAddress objects");
+ }
+ } catch (ArrayStoreException ase) {
+ throw new MailetException("The recipient list contains objects other than MailAddress objects");
+ }
+ }
+}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailMessage.java Mon Feb 22 14:44:02 2010
@@ -30,6 +30,12 @@
public final static String STATE = "mailstate";
private Mail mail;
+
+ /**
+ *
+ * @param mail the mail message to use as Body of the message
+ *
+ */
public MailMessage(Mail mail) {
this.mail = mail;
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailProcessorRouteBuilder.java Mon Feb 22 14:44:02 2010
@@ -20,13 +20,16 @@
package org.apache.james.transport.camel;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.mail.MessagingException;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.model.ChoiceDefinition;
import org.apache.commons.configuration.ConfigurationException;
@@ -34,27 +37,37 @@
import org.apache.commons.logging.Log;
import org.apache.james.lifecycle.Configurable;
import org.apache.james.lifecycle.LogEnabled;
+import org.apache.james.services.SpoolManager;
+import org.apache.james.transport.MailetConfigImpl;
import org.apache.james.transport.MailetLoader;
+import org.apache.james.transport.MatcherConfigImpl;
import org.apache.james.transport.MatcherLoader;
+import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
+import org.apache.mailet.MailetConfig;
import org.apache.mailet.Matcher;
+import org.apache.mailet.MatcherConfig;
+import org.apache.mailet.base.GenericMailet;
import org.apache.mailet.base.MatcherInverter;
/**
* Build up the Camel Route by parsing the spoolmanager.xml configuration file.
*
- * @author norman
- *
+ * TODO: - Limit Threads
*/
-public class MailProcessorRouteBuilder extends RouteBuilder implements Configurable, LogEnabled {
+public class MailProcessorRouteBuilder extends RouteBuilder implements SpoolManager, Configurable, LogEnabled {
private MatcherLoader matcherLoader;
private HierarchicalConfiguration config;
private MailetLoader mailetLoader;
private Log logger;
- private List<Mailet> mailets = new ArrayList<Mailet>();
+ private final Map<String,List<Mailet>> mailets = new HashMap<String,List<Mailet>>();
+ private final Map<String,List<Matcher>> matchers = new HashMap<String,List<Matcher>>();
+
+ private final List<String> processors = new ArrayList<String>();
+
@Resource(name = "matcherpackages")
public void setMatcherLoader(MatcherLoader matcherLoader) {
this.matcherLoader = matcherLoader;
@@ -73,16 +86,26 @@
@Override
public void configure() throws Exception {
-
- ChoiceDefinition spoolDef = from("spool://spoolRepository").choice();
+ Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
+
+ // start to consume from the spool
+ ChoiceDefinition spoolDef = from("spool://spoolRepository")
+
+ // start first choice
+ .choice();
List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
for (int i = 0; i < processorConfs.size(); i++) {
final HierarchicalConfiguration processorConf = processorConfs.get(i);
String processorName = processorConf.getString("[@name]");
+
+ processors.add(processorName);
+ mailets.put(processorName, new ArrayList<Mailet>());
+ matchers.put(processorName, new ArrayList<Matcher>());
+
// Check which route we need to go
- ChoiceDefinition processorDef = spoolDef.when(header(MailMessage.STATE).isEqualTo(processorName)).setHeader("currentProcessor", constant(processorName));
+ ChoiceDefinition processorDef = spoolDef.when(header(MailMessage.STATE).isEqualTo(processorName));
final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
// Loop through the mailet configuration, load
@@ -152,42 +175,113 @@
throw new ConfigurationException("Unable to init mailet", ex);
}
if (mailet != null && matcher != null) {
- // Store the matcher to use for splitter
- processorDef.setHeader(MatcherSplitter.MATCHER_HEADER, constant(matcher))
- // do splitting
- .split().method(MatcherSplitter.class)
- // check if we need to execute the mailet and remove headers
- .choice().when(header(MatcherSplitter.MATCHER_MATCHED_HEADER).isEqualTo(true)).process(new MailetProcessor(mailet)).removeHeader(MatcherSplitter.MATCHER_MATCHED_HEADER)
+ String onMatchException = null;
+ MailetConfig mailetConfig = mailet.getMailetConfig();
+
+ if (mailetConfig instanceof MailetConfigImpl) {
+ onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
+ }
+
+ // Store the matcher to use for splitter in properties
+ processorDef.setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher))
+
+ // store the config in properties
+ .setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
+
+ // store the logger in properties
+ .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger))
+
+ // do splitting of the mail based on the stored matcher
+ .split().method(MatcherSplitter.class)
+
+ // start first choice
+ .choice()
+
+ // check if the state of the mail is the same as the
+ // current processor. if not we have can just store it to spool an stop the route processing
+ .when(header(MailMessage.STATE).isNotEqualTo(processorName)).to("spool://spoolRepository").stop()
+
+ // if not continue the route
+ .otherwise()
+
+ // start second choice
+ .choice()
+
+ // check if we need to execute the mailet. If so execute it and remove the header on the end
+ .when(header(MatcherSplitter.MATCHER_MATCHED_HEADER).isEqualTo("true")).process(new MailetProcessor(mailet, logger)).removeHeader(MatcherSplitter.MATCHER_MATCHED_HEADER)
+
+ // end second choice
+ .end()
+
+ // end first choice
.end()
- .end()
- // remove matcher from header
- .removeHeader(MatcherSplitter.MATCHER_HEADER);
+
+ // remove matcher from properties
+ .removeProperty(MatcherSplitter.MATCHER_PROPERTY)
+
+ // remove config from properties
+ .removeProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY)
- // store mailet for later destroy on shutdown
- mailets.add(mailet);
+ // remove logger from properties
+ .removeProperty(MatcherSplitter.LOGGER_PROPERTY);
+
+ // store mailet and matcher
+ mailets.get(processorName).add(mailet);
+ matchers.get(processorName).add(matcher);
}
}
- processorDef.to("spool://spoolRepository");
+
+ processorDef
+ // start choice
+ .choice()
+
+ // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
+ // make sure we don't fall into a endless loop
+ .when(header(MailMessage.STATE).isEqualTo(processorName)).process(terminatingMailetProcessor)
+
+ // end the choice
+ .end()
+
+ // route everything to the spool now
+ .to("spool://spoolRepository");
}
-
- // just use a mock for now
- spoolDef.end();
}
+ /**
+ * Destroy all mailets and matchers
+ */
@PreDestroy
public void dispose() {
- Iterator<Mailet> it = mailets.iterator();
boolean debugEnabled = logger.isDebugEnabled();
+
+ Iterator<List<Mailet>> it = mailets.values().iterator();
while (it.hasNext()) {
- Mailet mailet = it.next();
- if (debugEnabled) {
- logger.debug("Shutdown mailet " + mailet.getMailetInfo());
+ List<Mailet> mList = it.next();
+ for (int i = 0; i < mList.size(); i++) {
+ Mailet m = mList.get(i);
+ if (debugEnabled) {
+ logger.debug("Shutdown mailet " + m.getMailetInfo());
+ }
+ m.destroy();
}
- mailet.destroy();
+
}
+
+ Iterator<List<Matcher>> mit = matchers.values().iterator();
+ while (mit.hasNext()) {
+ List<Matcher> mList = mit.next();
+ for (int i = 0; i < mList.size(); i++) {
+ Matcher m = mList.get(i);
+ if (debugEnabled) {
+ logger.debug("Shutdown matcher " + m.getMatcherInfo());
+ }
+ m.destroy();
+ }
+
+ }
}
/*
@@ -207,4 +301,91 @@
}
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.services.SpoolManager#getMailetConfigs(java.lang.String)
+ */
+ public List<MailetConfig> getMailetConfigs(String processorName) {
+ List<MailetConfig> mailetConfigs = new ArrayList<MailetConfig>();
+ Iterator<Mailet> iterator = mailets.get(processorName).iterator();
+ while (iterator.hasNext()) {
+ Mailet mailet = (Mailet) iterator.next();
+ MailetConfig mailetConfig = mailet.getMailetConfig();
+ if (mailetConfig == null) mailetConfigs.add(new MailetConfigImpl()); // placeholder
+ else mailetConfigs.add(mailetConfig);
+ }
+ return mailetConfigs;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.services.SpoolManager#getMatcherConfigs(java.lang.String)
+ */
+ public List<MatcherConfig> getMatcherConfigs(String processorName) {
+ List<MatcherConfig> matcherConfigs = new ArrayList<MatcherConfig>();
+ Iterator<Matcher> iterator = matchers.get(processorName).iterator();
+ while (iterator.hasNext()) {
+ Matcher matcher = (Matcher) iterator.next();
+ MatcherConfig matcherConfig = matcher.getMatcherConfig();
+ if (matcherConfig == null) matcherConfigs.add(new MatcherConfigImpl()); // placeholder
+ else matcherConfigs.add(matcherConfig);
+ }
+ return matcherConfigs;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.services.SpoolManager#getProcessorNames()
+ */
+ public String[] getProcessorNames() {
+ return processors.toArray(new String[processors.size()]);
+ }
+
+
+ /**
+ * Mailet which protect us to not fall into an endless loop caused by an configuration error
+ *
+ */
+ private final class TerminatingMailet extends GenericMailet {
+ /**
+ * The name of the mailet used to terminate the mailet chain. The
+ * end of the matcher/mailet chain must be a matcher that matches
+ * all mails and a mailet that sets every mail to GHOST status.
+ * This is necessary to ensure that mails are removed from the spool
+ * in an orderly fashion.
+ */
+ private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.mailet.base.GenericMailet#service(org.apache.mailet.Mail)
+ */
+ public void service(Mail mail) {
+ if (!(Mail.ERROR.equals(mail.getState()))) {
+ // Don't complain if we fall off the end of the
+ // error processor. That is currently the
+ // normal situation for James, and the message
+ // will show up in the error store.
+ StringBuffer warnBuffer = new StringBuffer(256)
+ .append("Message ")
+ .append(mail.getName())
+ .append(" reached the end of this processor, and is automatically deleted. This may indicate a configuration error.");
+ logger.warn(warnBuffer.toString());
+ }
+
+ // Set the mail to ghost state
+ mail.setState(Mail.GHOST);
+ }
+
+ @Override
+ public String getMailetInfo() {
+ return getMailetName();
+ }
+
+ @Override
+ public String getMailetName() {
+ return TERMINATING_MAILET_NAME;
+ }
+ }
+
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MailetProcessor.java Mon Feb 22 14:44:02 2010
@@ -1,9 +1,35 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
package org.apache.james.transport.camel;
+import java.util.Locale;
+
+import javax.mail.MessagingException;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.commons.logging.Log;
+import org.apache.james.transport.MailetConfigImpl;
+import org.apache.james.transport.ProcessorUtil;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
+import org.apache.mailet.MailetConfig;
/**
* Mailet wrapper which execute a Mailet in a Processor
@@ -12,23 +38,47 @@
public class MailetProcessor implements Processor{
private Mailet mailet;
+ private Log logger;
- public MailetProcessor(Mailet mailet) {
+ /**
+ * Mailet to call on process
+ *
+ * @param mailet
+ */
+ public MailetProcessor(Mailet mailet, Log logger) {
this.mailet = mailet;
+ this.logger = logger;
}
- /*
- * (non-Javadoc)
- * @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
+ /**
+ * Call the wrapped mailet for the exchange
*/
+ @SuppressWarnings("unchecked")
public void process(Exchange exchange) throws Exception {
+ Mail mail = (Mail) exchange.getIn().getBody();
try {
- mailet.service((Mail)exchange.getIn().getBody());
- } catch (Exception e) {
- e.printStackTrace();
- throw e;
+ mailet.service(mail);
+ } catch (MessagingException me) {
+ String onMailetException = null;
+
+ MailetConfig mailetConfig = mailet.getMailetConfig();
+ if (mailetConfig instanceof MailetConfigImpl) {
+ onMailetException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMailetException");
+ }
+ if (onMailetException == null) {
+ onMailetException = Mail.ERROR;
+ } else {
+ onMailetException = onMailetException.trim().toLowerCase(Locale.US);
+ }
+ if (onMailetException.compareTo("ignore") == 0) {
+ // ignore the exception and continue
+ // this option should not be used if the mail object can be
+ // changed by the mailet
+ ProcessorUtil.verifyMailAddresses(mail.getRecipients());
+ } else {
+ ProcessorUtil.handleException(me, mail, mailet.getMailetConfig().getMailetName(), onMailetException, logger);
+ }
}
-
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/MatcherSplitter.java Mon Feb 22 14:44:02 2010
@@ -23,13 +23,16 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Locale;
import javax.mail.MessagingException;
import org.apache.camel.Body;
-import org.apache.camel.Header;
import org.apache.camel.InOnly;
+import org.apache.camel.Property;
+import org.apache.commons.logging.Log;
import org.apache.james.core.MailImpl;
+import org.apache.james.transport.ProcessorUtil;
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
import org.apache.mailet.Matcher;
@@ -43,10 +46,21 @@
@InOnly
public class MatcherSplitter {
+ /**
+ * Headername which is used to indicate that the matcher matched
+ */
public final static String MATCHER_MATCHED_HEADER = "matched";
- public final static String MATCHER_HEADER = "matcher";
+ /**
+ * Headername under which the matcher is stored
+ */
+ public final static String MATCHER_PROPERTY = "matcher";
+ public final static String ON_MATCH_EXCEPTION_PROPERTY = "onMatchException";
+
+ public final static String LOGGER_PROPERTY = "logger";
+
+
/**
* Generate a List of MailMessage instances for the give @Body. This is done by using the given Matcher to see if we need more then one instance of the
* MailMessage
@@ -57,44 +71,77 @@
* @throws MessagingException
*/
@SuppressWarnings("unchecked")
- public List<MailMessage> split(@Header(MATCHER_HEADER) Matcher matcher,@Body Mail mail) throws MessagingException {
+ public List<MailMessage> split(@Property(MATCHER_PROPERTY) Matcher matcher, @Property(ON_MATCH_EXCEPTION_PROPERTY) String onMatchException, @Property(LOGGER_PROPERTY) Log logger, @Body Mail mail) throws MessagingException {
List<MailMessage> mails = new ArrayList<MailMessage>();
boolean fullMatch = false;
+ // call the matcher
+ Collection<MailAddress> matchedRcpts = null;
- Collection<MailAddress> matchedRcpts = matcher.match(mail);
+ try {
+ matchedRcpts = matcher.match(mail);
+ if (matchedRcpts == null) {
+ //In case the matcher returned null, create an empty Collection
+ matchedRcpts = new ArrayList<MailAddress>(0);
+ } else if (matchedRcpts != mail.getRecipients()) {
+ //Make sure all the objects are MailAddress objects
+ ProcessorUtil.verifyMailAddresses(matchedRcpts);
+ }
+ } catch (MessagingException me) {
+
+ if (onMatchException == null) {
+ onMatchException = Mail.ERROR;
+ } else {
+ onMatchException = onMatchException.trim().toLowerCase(Locale.US);
+ }
+ if (onMatchException.compareTo("nomatch") == 0) {
+ //In case the matcher returned null, create an empty Collection
+ matchedRcpts = new ArrayList<MailAddress>(0);
+ } else if (onMatchException.compareTo("matchall") == 0) {
+ matchedRcpts = mail.getRecipients();
+ // no need to verify addresses
+ } else {
+ ProcessorUtil.handleException(me, mail, matcher.getMatcherConfig().getMatcherName(), onMatchException, logger);
+ }
+ }
// check if the matcher matched
- if (matchedRcpts != null && matchedRcpts.isEmpty() == false) {
+ if ( matchedRcpts != null && matchedRcpts.isEmpty() == false) {
+ List<MailAddress> rcpts = new ArrayList<MailAddress>(mail.getRecipients());
+
+ Iterator<MailAddress> rcptsIterator = matchedRcpts.iterator();
+
+ while(rcptsIterator.hasNext()) {
+ // loop through the recipients and remove the recipiends that matched
+ rcpts.remove(rcptsIterator.next());
+ }
- // check if we need to create another instance of the mail. This is only needed if not all
- // recipients matched
- if (matchedRcpts.equals(mail.getRecipients()) == false) {
+ if (rcpts.isEmpty()) {
+ // all recipients matched
+ fullMatch = true;
+ } else {
+ mail.setRecipients(rcpts);
+
Mail newMail = new MailImpl(mail);
newMail.setRecipients(matchedRcpts);
-
- MailMessage newmsg = new MailMessage(newMail);
+ MailMessage newmsg = new MailMessage(newMail);
+
// Set a header because the matcher matched. This can be used later when processing the route
newmsg.setHeader(MATCHER_MATCHED_HEADER, true);
+
+ // add the new generated mail to the mails list
mails.add(newmsg);
-
- List<MailAddress> rcpts = new ArrayList<MailAddress>(mail.getRecipients());
- Iterator<MailAddress> rcptsIterator = newMail.getRecipients().iterator();
- while(rcptsIterator.hasNext()) {
- rcpts.remove(rcptsIterator.next());
- }
- mail.setRecipients(rcpts);
- } else {
- // all recipients matched
- fullMatch = true;
}
}
+
MailMessage mailMsg = new MailMessage(mail);
if (fullMatch) {
// Set a header because the matcher matched. This can be used later when processing the route
mailMsg.setHeader(MATCHER_MATCHED_HEADER, true);
}
+
+ // add mailMsg to the mails list
mails.add(mailMsg);
return mails;
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolComponent.java Mon Feb 22 14:44:02 2010
@@ -19,7 +19,6 @@
package org.apache.james.transport.camel;
-import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
@@ -28,12 +27,14 @@
import org.apache.camel.impl.DefaultComponent;
import org.apache.commons.logging.Log;
import org.apache.james.lifecycle.LogEnabled;
-import org.apache.james.services.SpoolManager;
import org.apache.james.services.SpoolRepository;
-import org.apache.mailet.MailetConfig;
-import org.apache.mailet.MatcherConfig;
-public class SpoolComponent extends DefaultComponent implements SpoolManager, LogEnabled {
+/**
+ * Component which acts as SpoolEndPoint factory
+ *
+ *
+ */
+public class SpoolComponent extends DefaultComponent implements LogEnabled {
private SpoolRepository spool;
private Log log;
@@ -49,21 +50,6 @@
return new SpoolEndPoint(uri,this, spool, log);
}
- public List<MailetConfig> getMailetConfigs(String processorName) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public List<MatcherConfig> getMatcherConfigs(String processorName) {
- // TODO Auto-generated method stub
- return null;
- }
-
- public String[] getProcessorNames() {
- // TODO Auto-generated method stub
- return null;
- }
-
/*
* (non-Javadoc)
* @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolConsumer.java Mon Feb 22 14:44:02 2010
@@ -19,6 +19,10 @@
package org.apache.james.transport.camel;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.DefaultConsumer;
@@ -27,12 +31,19 @@
import org.apache.james.services.SpoolRepository;
import org.apache.mailet.Mail;
+/**
+ * Consumer implementation which consum Mail objects of the SpoolRepository
+ *
+ *
+ */
public class SpoolConsumer extends DefaultConsumer{
private SpoolRepository spool;
private Processor processor;
private Log log;
+ private ExecutorService executor = Executors.newSingleThreadExecutor();
+ private Future<?> f;
public SpoolConsumer(DefaultEndpoint endpoint, Processor processor, SpoolRepository spool, Log log) {
super(endpoint, processor);
@@ -42,18 +53,21 @@
}
- public Exchange receive() {
+ /**
+ * Receive a Mail object of the SpoolRepository when one is ready. This method will block until a Mail object is ready to process
+ *
+ * @return exchange exchange which holds the MailMessage build up on the Mail
+ */
+ private Exchange receive() {
Exchange ex = getEndpoint().createExchange();
Mail mail;
try {
mail = spool.accept();
ex.setIn(new MailMessage(mail));
+ log.debug("Mail " + mail.toString() + " ready for process. Start to process exchange " +ex);
processor.process(ex);
} catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
ex.setException(e);
-
}
return ex;
@@ -63,7 +77,7 @@
@Override
protected void doStart() throws Exception {
super.doStart();
- Thread t = new Thread(new Runnable() {
+ f = executor.submit(new Runnable() {
public void run() {
while(true) {
@@ -72,15 +86,17 @@
}
});
- t.setDaemon(true);
- t.start();
}
@Override
protected void doStop() throws Exception {
super.doStop();
-
+ try {
+ f.cancel(true);
+ } catch (Exception e) {
+ // ignore
+ }
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolEndPoint.java Mon Feb 22 14:44:02 2010
@@ -26,6 +26,10 @@
import org.apache.commons.logging.Log;
import org.apache.james.services.SpoolRepository;
+/**
+ * Endpoint which handles Producer and Consumer for SpoolRepositories
+ *
+ */
public class SpoolEndPoint extends DefaultEndpoint{
private SpoolRepository spool;
@@ -61,9 +65,7 @@
* @see org.apache.camel.Endpoint#createConsumer(org.apache.camel.Processor)
*/
public Consumer createConsumer(Processor processor) throws Exception {
- SpoolConsumer consumer = new SpoolConsumer(this,processor,spool,log);
- return consumer;
-
+ return new SpoolConsumer(this,processor,spool,log);
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/SpoolProducer.java Mon Feb 22 14:44:02 2010
@@ -27,6 +27,11 @@
import org.apache.james.services.SpoolRepository;
import org.apache.mailet.Mail;
+/**
+ * Producer which handles store and remove of Mails. If a Mail is has state Mail.GHOST or the recipients are size of 0 it will get removed from the spool, otherwise it will get stored
+ * back to the spool
+ *
+ */
public class SpoolProducer extends DefaultProducer {
private SpoolRepository spool;
@@ -44,7 +49,6 @@
* @see org.apache.camel.Processor#process(org.apache.camel.Exchange)
*/
public void process(Exchange exchange) throws Exception {
- // Exchange newExchange = getEndpoint().createExchange(exchange);
Mail mail = (Mail) exchange.getIn().getBody();
// Only remove an email from the spool is processing is
@@ -55,11 +59,12 @@
StringBuffer debugBuffer = new StringBuffer(64).append("==== Removed from spool mail ").append(mail.getName()).append("====");
log.debug(debugBuffer.toString());
}
+ // Dispose mail
LifecycleUtil.dispose(mail);
} else {
-
try {
+ // store mail back to spool
spool.store(mail);
} catch (Exception e) {
e.printStackTrace();
Modified: james/server/trunk/spring-deployment/src/main/config/james/log4j.properties
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/log4j.properties?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/log4j.properties (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/log4j.properties Mon Feb 22 14:44:02 2010
@@ -129,7 +129,8 @@
-
+# logger for camel
+log4j.logger.org.apache.camel=WARN, CONS, FILE
log4j.logger.org.springframework=WARN, CONS, FILE
log4j.logger.org.apache.james=DEBUG, CONS, FILE
#log4j.logger.james: set from default value WARN to INFO or even DEBUG to see (even) more logging
Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=912589&r1=912588&r2=912589&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Mon Feb 22 14:44:02 2010
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN 2.0//EN" "http://www.springframework.org/dtd/spring-beans-2.0.dtd">
<!--
! Licensed to the Apache Software Foundation (ASF) under one ! ! or
@@ -17,7 +16,13 @@
-->
-<beans>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:camel="http://camel.apache.org/schema/spring"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+
<!--
** JMX part ** to enable exposure of JMX, activate the following beans
@@ -76,7 +81,7 @@
<entry key="smtpProtocolHandlerChain" value="smtpserver"/>
<entry key="pop3ProtocolHandlerChain" value="pop3server"/>
<entry key="remoteProtocolHandlerChain" value="remotemanager"/>
- <entry key="mailProcessor" value="spoolmanager"/>
+ <entry key="spool" value="spoolmanager"/>
</map>
</property>
</bean>
@@ -96,7 +101,7 @@
<entry key="smtpProtocolHandlerChain" value="smtpserver"/>
<entry key="pop3ProtocolHandlerChain" value="pop3server"/>
<entry key="remoteProtocolHandlerChain" value="remoteManager"/>
- <entry key="mailProcessor" value="spoolmanager"/>
+ <entry key="spool" value="spoolmanager"/>
</map>
</property>
</bean>
@@ -107,22 +112,33 @@
<bean id="instanceFactory" class="org.apache.james.container.spring.SpringInstanceFactory"/>
-
<!-- Add support for Persistence annotations -->
<bean class="org.springframework.orm.jpa.support.PersistenceAnnotationBeanPostProcessor"/>
+ <!-- Change trace to true for debugging purposes -->
+ <camelContext id="jamesCamelContext" xmlns="http://camel.apache.org/schema/spring" trace="false">
+ <routeBuilder ref="processorRoute" />
+ </camelContext>
+
+ <!-- Build the camelroute from the spoolmanager.xml -->
+ <bean id="spoolmanager" name="processorRoute" class="org.apache.james.transport.camel.MailProcessorRouteBuilder"/>
+
+ <!-- The component for the spool which handles consuming and producing -->
+ <bean id="spool" class="org.apache.james.transport.camel.SpoolComponent" />
+
<bean id="James" class="org.apache.james.James"/>
<bean id="mailetcontext" class="org.apache.james.JamesMailetContext"/>
<!-- The James Spool Manager block -->
+ <!--
<bean id="spoolmanager" class="org.apache.james.transport.JamesSpoolManager" />
<bean id="mailProcessor" class="org.apache.james.container.spring.StateAwareProcessorList">
<property name="logRegistry" ref="logRegistry"/>
<property name="configurationRegistry" ref="configurationRegistry"/>
</bean>
-
+ -->
<bean id="matcherpackages" class="org.apache.james.transport.JamesMatcherLoader" />
<bean id="mailetpackages" class="org.apache.james.transport.JamesMailetLoader" />
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org