You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/12/21 17:59:53 UTC
svn commit: r1051556 - in /james/server/trunk:
container-spring/src/main/config/james/
mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/
mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/
Author: norman
Date: Tue Dec 21 16:59:52 2010
New Revision: 1051556
URL: http://svn.apache.org/viewvc?rev=1051556&view=rev
Log:
some refactoring to make it easier to reuse parts of mailetcontainer-camel and mailetcontainer-library.
Added:
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailetContainer.java
james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/AbstractMailProcessorList.java
Modified:
james/server/trunk/container-spring/src/main/config/james/mailetcontainer.xml
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
Modified: james/server/trunk/container-spring/src/main/config/james/mailetcontainer.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/container-spring/src/main/config/james/mailetcontainer.xml?rev=1051556&r1=1051555&r2=1051556&view=diff
==============================================================================
--- james/server/trunk/container-spring/src/main/config/james/mailetcontainer.xml (original)
+++ james/server/trunk/container-spring/src/main/config/james/mailetcontainer.xml Tue Dec 21 16:59:52 2010
@@ -20,12 +20,12 @@
<!-- See http://james.apache.org/server/3/config.html for usage -->
-<mailetcontainer>
+<mailetcontainer enableJmx="true">
<postmaster>Postmaster@localhost</postmaster>
<threads> 20 </threads>
- <processor name="root">
+ <processor name="root" enableJmx="true">
<mailet match="All" class="PostmasterAlias"/>
<mailet match="RelayLimit=30" class="Null"/>
<mailet match="SMTPAuthSuccessful" class="ToProcessor">
@@ -40,7 +40,7 @@
</mailet>
</processor>
- <processor name="transport">
+ <processor name="transport" enableJmx="true">
<mailet match="SMTPAuthSuccessful" class="SetMimeHeader">
<name>X-UserIsAuth</name>
<value>true</value>
@@ -69,31 +69,31 @@
</mailet>
</processor>
- <processor name="error">
+ <processor name="error" enableJmx="true">
<mailet match="All" class="ToRepository">
<repositoryPath>file://var/mail/error/</repositoryPath>
</mailet>
</processor>
- <processor name="spam">
+ <processor name="spam" enableJmx="true">
<mailet match="All" class="ToRepository">
<repositoryPath>file://var/mail/spam/</repositoryPath>
</mailet>
</processor>
- <processor name="local-address-error">
+ <processor name="local-address-error" enableJmx="true">
<mailet match="All" class="ToRepository">
<repositoryPath>file://var/mail/address-error/</repositoryPath>
</mailet>
</processor>
- <processor name="relay-denied">
+ <processor name="relay-denied" enableJmx="true">
<mailet match="All" class="ToRepository">
<repositoryPath>file://var/mail/relay-denied/</repositoryPath>
</mailet>
</processor>
- <processor name="bounces">
+ <processor name="bounces" enableJmx="true">
<mailet match="All" class="DSNBounce">
<passThrough>false</passThrough>
</mailet>
Modified: james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1051556&r1=1051555&r2=1051556&view=diff
==============================================================================
--- james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java (original)
+++ james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java Tue Dec 21 16:59:52 2010
@@ -19,57 +19,22 @@
package org.apache.james.mailetcontainer.camel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
import javax.annotation.Resource;
-import javax.mail.MessagingException;
-import javax.management.JMException;
-import javax.management.MalformedObjectNameException;
-import javax.management.NotCompliantMBeanException;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.CamelExecutionException;
-import org.apache.camel.Exchange;
-import org.apache.camel.Processor;
-import org.apache.camel.ProducerTemplate;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.model.RouteDefinition;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.HierarchicalConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.james.lifecycle.api.Configurable;
-import org.apache.james.lifecycle.api.Disposable;
-import org.apache.james.lifecycle.api.LogEnabled;
-import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.MailProcessorList;
-import org.apache.james.mailetcontainer.api.MailProcessorListListener;
-import org.apache.james.mailetcontainer.api.MailetContainerListener;
import org.apache.james.mailetcontainer.api.MailetContainer;
import org.apache.james.mailetcontainer.api.MailetLoader;
import org.apache.james.mailetcontainer.api.MatcherLoader;
-import org.apache.james.mailetcontainer.api.jmx.ProcessorManagementMBean;
-import org.apache.james.mailetcontainer.lib.MailetConfigImpl;
-import org.apache.james.mailetcontainer.lib.MatcherConfigImpl;
-import org.apache.james.mailetcontainer.lib.jmx.JMXMailProcessorListListener;
-import org.apache.james.mailetcontainer.lib.jmx.JMXMailetContainerListener;
+import org.apache.james.mailetcontainer.lib.AbstractMailProcessorList;
import org.apache.james.mailetcontainer.lib.matchers.CompositeMatcher;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
-import org.apache.mailet.MailetConfig;
import org.apache.mailet.MailetContext;
import org.apache.mailet.Matcher;
-import org.apache.mailet.MatcherConfig;
-import org.apache.mailet.base.GenericMailet;
-import org.apache.mailet.base.MatcherInverter;
/**
* Build up the Camel Routes by parsing the mailetcontainer.xml configuration file.
@@ -80,19 +45,12 @@ import org.apache.mailet.base.MatcherInv
* See JAMES-948
*
*/
-public class CamelMailProcessorList implements Configurable, LogEnabled, MailProcessorList, CamelContextAware, ProcessorManagementMBean {
+public class CamelMailProcessorList extends AbstractMailProcessorList implements CamelContextAware{
+ private CamelContext camelContext;
+ private MailetContext mailetContext;
private MatcherLoader matcherLoader;
- private HierarchicalConfiguration config;
private MailetLoader mailetLoader;
- private Log logger;
-
- private final Map<String,List<Mailet>> mailets = new HashMap<String,List<Mailet>>();
- private final Map<String,List<Matcher>> matchers = new HashMap<String,List<Matcher>>();
-
- private final Map<String,MailProcessor> processors = new HashMap<String,MailProcessor>();
- private final UseLatestAggregationStrategy aggr = new UseLatestAggregationStrategy();
- private MailetContext mailetContext;
@Resource(name = "matcherloader")
public void setMatcherLoader(MatcherLoader matcherLoader) {
@@ -109,327 +67,22 @@ public class CamelMailProcessorList impl
this.mailetContext = mailetContext;
}
- private ProducerTemplate producerTemplate;
- private CamelContext camelContext;
- private List<MailProcessorListListener> listeners = Collections.synchronizedList(new ArrayList<MailProcessorListListener>());
- private JMXMailProcessorListListener jmxListener;
- private boolean enableJmx;
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailProcessorList#addListener(org.apache.james.mailetcontainer.api.MailProcessorListListener)
- */
- public void addListener(MailProcessorListListener listener) {
- listeners.add(listener);
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailProcessorList#getListeners()
- */
- public List<MailProcessorListListener> getListeners() {
- return listeners;
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailProcessorList#removeListener(org.apache.james.mailetcontainer.api.MailProcessorListListener)
- */
- public void removeListener(MailProcessorListListener listener) {
- listeners.remove(listener);
- }
-
- @PostConstruct
+ @PostConstruct
public void init() throws Exception {
- getCamelContext().addRoutes(new SpoolRouteBuilder());
-
- if (enableJmx) {
- this.jmxListener = new JMXMailProcessorListListener(this);
- addListener(jmxListener);
- }
- producerTemplate = getCamelContext().createProducerTemplate();
-
+ super.init();
+
// Make sure the camel context get started
// See https://issues.apache.org/jira/browse/JAMES-1069
if (getCamelContext().getStatus().isStopped()) {
getCamelContext().start();
}
-
-
-
- }
-
- /**
- * Destroy all mailets and matchers
- */
- @PreDestroy
- public void dispose() {
- boolean debugEnabled = logger.isDebugEnabled();
-
- Iterator<List<Mailet>> it = mailets.values().iterator();
- while (it.hasNext()) {
- List<Mailet> mList = it.next();
- for (int i = 0; i < mList.size(); i++) {
- Mailet m = mList.get(i);
- if (debugEnabled) {
- logger.debug("Shutdown mailet " + m.getMailetInfo());
- }
- m.destroy();
- }
-
- }
-
- Iterator<List<Matcher>> mit = matchers.values().iterator();
- while (mit.hasNext()) {
- List<Matcher> mList = mit.next();
- for (int i = 0; i < mList.size(); i++) {
- Matcher m = mList.get(i);
- if (debugEnabled) {
- logger.debug("Shutdown matcher " + m.getMatcherInfo());
- }
- m.destroy();
- }
-
- }
- if (jmxListener != null) {
- jmxListener.dispose();
- }
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.lifecycle.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration)
- */
- public void configure(HierarchicalConfiguration config) throws ConfigurationException {
- this.config = config;
- this.enableJmx = config.getBoolean("enableJmx", true);;
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
- */
- public void setLog(Log log) {
- this.logger = log;
-
- }
-
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.MailProcessorList#getProcessorNames()
- */
- public String[] getProcessorNames() {
- return processors.keySet().toArray(new String[processors.size()]);
- }
-
-
- /**
- * Mailet which protect us to not fall into an endless loop caused by an configuration error
- *
- */
- private final class TerminatingMailet extends GenericMailet {
- /**
- * The name of the mailet used to terminate the mailet chain. The
- * end of the matcher/mailet chain must be a matcher that matches
- * all mails and a mailet that sets every mail to GHOST status.
- * This is necessary to ensure that mails are removed from the spool
- * in an orderly fashion.
- */
- private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
-
- /*
- * (non-Javadoc)
- * @see org.apache.mailet.base.GenericMailet#service(org.apache.mailet.Mail)
- */
- public void service(Mail mail) {
- if (!(Mail.ERROR.equals(mail.getState()))) {
- // Don't complain if we fall off the end of the
- // error processor. That is currently the
- // normal situation for James, and the message
- // will show up in the error store.
- StringBuffer warnBuffer = new StringBuffer(256)
- .append("Message ")
- .append(mail.getName())
- .append(" reached the end of this processor, and is automatically deleted. This may indicate a configuration error.");
- logger.warn(warnBuffer.toString());
- }
-
- // Set the mail to ghost state
- mail.setState(Mail.GHOST);
- }
-
- @Override
- public String getMailetInfo() {
- return getMailetName();
- }
-
- @Override
- public String getMailetName() {
- return TERMINATING_MAILET_NAME;
- }
- }
-
- /**
- * Return the endpoint for the processorname.
- *
- * This will return a "direct" endpoint.
- *
- * @param processorName
- * @return endPoint
- */
- protected String getEndpoint(String processorName) {
- return "direct:processor." + processorName;
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailProcessor#service(org.apache.mailet.Mail)
- */
- public void service(Mail mail) throws MessagingException {
- long start = System.currentTimeMillis();
- MessagingException ex = null;
- MailProcessor processor = getProcessor(mail.getState());
-
- if (processor != null) {
- try {
- processor.service(mail);
- } catch (MessagingException e) {
- ex = e;
- throw e;
- } finally {
- long end = System.currentTimeMillis() - start;
- for (int i = 0; i < listeners.size(); i++) {
- MailProcessorListListener listener = listeners.get(i);
-
- listener.afterProcessor(processor, mail.getName(), end, ex);
- }
- }
- } else {
- throw new MessagingException("No processor found for mail " + mail.getName() + " with state " + mail.getState());
- }
}
/*
* (non-Javadoc)
- * @see org.apache.james.transport.ProcessorList#getProcessor(java.lang.String)
- */
- public MailProcessor getProcessor(String name) {
- return processors.get(name);
- }
-
-
- /**
- * {@link Processor} which just call the {@link CamelMailProcessorList#service(Mail) method
*
- *
- */
- private final class MailCamelProcessor implements Processor {
-
- public void process(Exchange exchange) throws Exception {
- service(exchange.getIn().getBody(Mail.class));
- }
-
- }
-
- private final class RemovePropertiesProcessor implements Processor {
-
- public void process(Exchange exchange) throws Exception {
- exchange.removeProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY);
- exchange.removeProperty(MatcherSplitter.MATCHER_PROPERTY);
- }
- }
-
-
- private final class ChildProcessor implements MailetContainer, Disposable {
-
-
- private String processorName;
- private List<MailetContainerListener> listeners = Collections.synchronizedList(new ArrayList<MailetContainerListener>());
- private JMXMailetContainerListener jmxListener;
-
- public ChildProcessor(String processorName){
- this.processorName = processorName;
- }
-
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailProcessor#service(org.apache.mailet.Mail)
- */
- public void service(Mail mail) throws MessagingException {
- try {
- producerTemplate.sendBody(getEndpoint(processorName), mail);
-
- } catch (CamelExecutionException ex) {
- throw new MessagingException("Unable to process mail " + mail.getName(),ex);
- }
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailetContainer#getMailets()
- */
- public List<Mailet> getMailets() {
- return Collections.unmodifiableList(mailets.get(processorName));
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailetContainer#getMatchers()
- */
- public List<Matcher> getMatchers() {
- return Collections.unmodifiableList(matchers.get(processorName));
- }
-
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailetContainer#addListener(org.apache.james.mailetcontainer.api.MailetContainerListener)
- */
- public void addListener(MailetContainerListener listener) {
- listeners.add(listener);
- }
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailetContainer#removeListener(org.apache.james.mailetcontainer.api.MailetContainerListener)
- */
- public void removeListener(MailetContainerListener listener) {
- listeners.remove(listener);
- }
-
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.mailetcontainer.api.MailetContainer#getListeners()
- */
- public List<MailetContainerListener> getListeners() {
- return listeners;
- }
-
-
- /*
- * (non-Javadoc)
- * @see org.apache.james.lifecycle.api.Disposable#dispose()
- */
- public void dispose() {
- listeners.clear();
- if (jmxListener!= null) {
- jmxListener.dispose();
- }
- }
-
- public void registerJMX() throws MalformedObjectNameException, JMException {
- this.jmxListener = new JMXMailetContainerListener(processorName, ChildProcessor.this);
- addListener(jmxListener);
- }
- }
-
- /*
- * (non-Javadoc)
* @see org.apache.camel.CamelContextAware#getCamelContext()
*/
public CamelContext getCamelContext() {
@@ -444,294 +97,31 @@ public class CamelMailProcessorList impl
this.camelContext = camelContext;
}
-
- private final class SpoolRouteBuilder extends RouteBuilder {
- /*
- * (non-Javadoc)
- * @see org.apache.camel.builder.RouteBuilder#configure()
- */
- @SuppressWarnings("unchecked")
- @Override
- public void configure() throws Exception {
- Processor disposeProcessor = new DisposeProcessor();
- Processor mailProcessor = new MailCamelProcessor();
- Processor removePropsProcessor = new RemovePropertiesProcessor();
-
- List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
- for (int i = 0; i < processorConfs.size(); i++) {
- final HierarchicalConfiguration processorConf = processorConfs.get(i);
- String processorName = processorConf.getString("[@name]");
-
- ChildProcessor container = new ChildProcessor(processorName);
-
-
- if (processorName.equals(Mail.GHOST)) throw new ConfigurationException("ProcessorName of " + Mail.GHOST + " is reserved for internal use, choose a different name");
-
- mailets.put(processorName, new ArrayList<Mailet>());
- matchers.put(processorName, new ArrayList<Matcher>());
-
- RouteDefinition processorDef = from(getEndpoint(processorName)).routeId(processorName).inOnly()
- // store the logger in properties
- .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
-
- // load composite matchers if there are any
- Map<String,Matcher> compositeMatchers = new HashMap<String, Matcher>();
- loadCompositeMatchers(processorName, compositeMatchers, processorConf.configurationsAt("matcher"));
-
-
- final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
-
- // Loop through the mailet configuration, load
- // all of the matcher and mailets, and add
- // them to the processor.
- for (int j = 0; j < mailetConfs.size(); j++) {
- HierarchicalConfiguration c = mailetConfs.get(j);
-
- // We need to set this because of correctly parsing comma
- String mailetClassName = c.getString("[@class]");
- String matcherName = c.getString("[@match]", null);
- String invertedMatcherName = c.getString("[@notmatch]", null);
-
- Mailet mailet = null;
- Matcher matcher = null;
- try {
-
- if (matcherName != null && invertedMatcherName != null) {
- // if no matcher is configured throw an Exception
- throw new ConfigurationException("Please configure only match or nomatch per mailet");
- } else if (matcherName != null) {
- // try to load from compositeMatchers first
- matcher = compositeMatchers.get(matcherName);
- if (matcher == null) {
- // no composite Matcher found, try to load it via MatcherLoader
- matcher = matcherLoader.getMatcher(createMatcherConfig(matcherName));
- }
- } else if (invertedMatcherName != null) {
- // try to load from compositeMatchers first
- matcher = compositeMatchers.get(matcherName);
- if (matcher == null) {
- // no composite Matcher found, try to load it via MatcherLoader
- matcher = matcherLoader.getMatcher(createMatcherConfig(invertedMatcherName));
- }
- matcher = new MatcherInverter(matcher);
-
- } else {
- // default matcher is All
- matcher = matcherLoader.getMatcher(createMatcherConfig("All"));
- }
-
- // The matcher itself should log that it's been inited.
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ", ex.getNextException());
- }
- }
- System.err.println("Unable to init matcher " + matcherName);
- System.err.println("Check spool manager logs for more details.");
- // System.exit(1);
- throw new ConfigurationException("Unable to init matcher", ex);
- }
- try {
- mailet = mailetLoader.getMailet(createMailetConfig(mailetClassName, c));
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ", ex.getNextException());
- }
- }
- System.err.println("Unable to init mailet " + mailetClassName);
- System.err.println("Check spool manager logs for more details.");
- throw new ConfigurationException("Unable to init mailet", ex);
- }
- if (mailet != null && matcher != null) {
-
- String onMatchException = null;
- MailetConfig mailetConfig = mailet.getMailetConfig();
-
- if (mailetConfig instanceof MailetConfigImpl) {
- onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
- }
-
- MailetProcessor mailetProccessor = new MailetProcessor(mailet, logger, container);
- // Store the matcher to use for splitter in properties
- processorDef
- .setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException)).setProperty(MatcherSplitter.MAILETCONTAINER_PROPERTY, constant(container))
-
- // do splitting of the mail based on the stored matcher
- .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
-
- .choice().when(new MatcherMatch()).process(mailetProccessor).end()
-
- .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
-
- .choice().when(new MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
-
- // store mailet and matcher
- mailets.get(processorName).add(mailet);
- matchers.get(processorName).add(matcher);
- }
-
-
- }
-
- Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger, container);
-
-
- processorDef
- // start choice
- .choice()
-
- // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
- // make sure we don't fall into a endless loop
- .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
-
-
- // dispose when needed
- .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
-
- // route it to the next processor
- .otherwise().process(mailProcessor).stop();
-
-
-
- processors.put(processorName, container);
- if (enableJmx) {
- container.registerJMX();
- }
- }
-
- // check if all needed processors are configured
- checkProcessors();
-
- }
-
- }
-
-
- private MailetConfig createMailetConfig(String mailetName, HierarchicalConfiguration configuration) {
-
- final MailetConfigImpl configImpl = new MailetConfigImpl();
- configImpl.setMailetName(mailetName);
- configImpl.setConfiguration(configuration);
- configImpl.setMailetContext(mailetContext);
- return configImpl;
- }
-
- private MatcherConfig createMatcherConfig(String matchName) {
-
- String condition = (String) null;
- int i = matchName.indexOf('=');
- if (i != -1) {
- condition = matchName.substring(i + 1);
- matchName = matchName.substring(0, i);
- }
- final MatcherConfigImpl configImpl = new MatcherConfigImpl();
- configImpl.setMatcherName(matchName);
- configImpl.setCondition(condition);
- configImpl.setMailetContext(mailetContext);
- return configImpl;
-
- }
-
- /**
- * Check if all needed Processors are configured and if not throw a {@link ConfigurationException}
- *
- * @throws ConfigurationException
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.lib.AbstractMailetProcessorList#createMailetContainer(java.lang.String, org.apache.commons.configuration.HierarchicalConfiguration)
*/
- private void checkProcessors() throws ConfigurationException {
- boolean errorProcessorFound = false;
- boolean rootProcessorFound = false;
- Iterator<String> names = processors.keySet().iterator();
- while(names.hasNext()) {
- String name = names.next();
- if (name.equals(Mail.DEFAULT)) {
- rootProcessorFound = true;
- } else if (name.equals(Mail.ERROR)) {
- errorProcessorFound = true;
- }
-
- if (errorProcessorFound && rootProcessorFound) {
- return;
- }
- }
- if (errorProcessorFound == false) {
- throw new ConfigurationException("You need to configure a Processor with name " + Mail.ERROR);
- } else if (rootProcessorFound == false) {
- throw new ConfigurationException("You need to configure a Processor with name " + Mail.DEFAULT);
- }
+ protected MailetContainer createMailetContainer(String name, HierarchicalConfiguration config) throws Exception{
+ CamelMailetContainer container = new CamelMailetContainer();
+ container.setLog(logger);
+ container.setCamelContext(camelContext);
+ container.setMailetContext(mailetContext);
+ container.setMailetLoader(mailetLoader);
+ container.setMatcherLoader(matcherLoader);
+ container.configure(config);
+ container.init();
+ return container;
}
- /**
- * Load {@link CompositeMatcher} implementations and their child {@link Matcher}'s
- *
- * CompositeMatcher were added by JAMES-948
- *
- * @param processorName
- * @param compMap
- * @param compMatcherConfs
- * @return compositeMatchers
- * @throws ConfigurationException
- * @throws MessagingException
- * @throws NotCompliantMBeanException
- */
- @SuppressWarnings("unchecked")
- private List<Matcher> loadCompositeMatchers(String processorName, Map<String,Matcher> compMap, List<HierarchicalConfiguration> compMatcherConfs) throws ConfigurationException, MessagingException, NotCompliantMBeanException {
- List<Matcher> matchers = new ArrayList<Matcher>();
-
- for (int j= 0 ; j < compMatcherConfs.size(); j++) {
- HierarchicalConfiguration c = compMatcherConfs.get(j);
- String compName = c.getString("[@name]", null);
- String matcherName = c.getString("[@match]", null);
- String invertedMatcherName = c.getString("[@notmatch]", null);
-
- Matcher matcher = null;
- if (matcherName != null && invertedMatcherName != null) {
- // if no matcher is configured throw an Exception
- throw new ConfigurationException("Please configure only match or nomatch per mailet");
- } else if (matcherName != null) {
- matcher = matcherLoader.getMatcher(createMatcherConfig(matcherName));
- if (matcher instanceof CompositeMatcher) {
- CompositeMatcher compMatcher = (CompositeMatcher) matcher;
-
- List<Matcher> childMatcher = loadCompositeMatchers(processorName, compMap,c.configurationsAt("matcher"));
- for (int i = 0 ; i < childMatcher.size(); i++) {
- compMatcher.add(childMatcher.get(i));
- }
- }
- } else if (invertedMatcherName != null) {
- Matcher m = matcherLoader.getMatcher(createMatcherConfig(invertedMatcherName));
- if (m instanceof CompositeMatcher) {
- CompositeMatcher compMatcher = (CompositeMatcher) m;
-
- List<Matcher> childMatcher = loadCompositeMatchers(processorName, compMap,c.configurationsAt("matcher"));
- for (int i = 0 ; i < childMatcher.size(); i++) {
- compMatcher.add(childMatcher.get(i));
- }
- }
- matcher = new MatcherInverter(m);
- }
- if (matcher == null) throw new ConfigurationException("Unable to load matcher instance");
- matchers.add(matcher);
- if (compName != null) {
- // check if there is already a composite Matcher with the name registered in the processor
- if (compMap.containsKey(compName)) throw new ConfigurationException("CompositeMatcher with name " + compName + " is already defined in processor " + processorName);
- compMap.put(compName, matcher);
- }
+
+ @Override
+ public void dispose() {
+ String names[] = getProcessorNames();
+ for (int i = 0; i < names.length; i++) {
+ CamelMailetContainer container = (CamelMailetContainer) getProcessor(names[i]);
+ container.destroy();
}
- return matchers;
+ super.dispose();
}
+
+
}
Added: james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailetContainer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailetContainer.java?rev=1051556&view=auto
==============================================================================
--- james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailetContainer.java (added)
+++ james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailetContainer.java Tue Dec 21 16:59:52 2010
@@ -0,0 +1,548 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.mailetcontainer.camel;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import javax.mail.MessagingException;
+import javax.management.NotCompliantMBeanException;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.lifecycle.api.LogEnabled;
+import org.apache.james.mailetcontainer.api.MailetContainer;
+import org.apache.james.mailetcontainer.api.MailetContainerListener;
+import org.apache.james.mailetcontainer.api.MailetLoader;
+import org.apache.james.mailetcontainer.api.MatcherLoader;
+import org.apache.james.mailetcontainer.lib.MailetConfigImpl;
+import org.apache.james.mailetcontainer.lib.MatcherConfigImpl;
+import org.apache.james.mailetcontainer.lib.jmx.JMXMailetContainerListener;
+import org.apache.james.mailetcontainer.lib.matchers.CompositeMatcher;
+import org.apache.mailet.Mail;
+import org.apache.mailet.Mailet;
+import org.apache.mailet.MailetConfig;
+import org.apache.mailet.MailetContext;
+import org.apache.mailet.Matcher;
+import org.apache.mailet.MatcherConfig;
+import org.apache.mailet.base.GenericMailet;
+import org.apache.mailet.base.MatcherInverter;
+
+public class CamelMailetContainer implements MailetContainer, Configurable, LogEnabled, CamelContextAware{
+
+ private Log logger;
+ private CamelContext context;
+ private String processorName;
+ private JMXMailetContainerListener jmxListener;
+ private List<MailetContainerListener> listeners = Collections.synchronizedList(new ArrayList<MailetContainerListener>());
+ private boolean enableJmx = true;
+ private List<Matcher> matchers = new ArrayList<Matcher>();
+ private List<Mailet> mailets = new ArrayList<Mailet>();
+ private ProducerTemplate producerTemplate;
+ private HierarchicalConfiguration config;
+
+ private final UseLatestAggregationStrategy aggr = new UseLatestAggregationStrategy();
+ private MailetContext mailetContext;
+ private MatcherLoader matcherLoader;
+ private MailetLoader mailetLoader;
+
+ @Resource(name = "matcherloader")
+ public void setMatcherLoader(MatcherLoader matcherLoader) {
+ this.matcherLoader = matcherLoader;
+ }
+
+ @Resource(name = "mailetloader")
+ public void setMailetLoader(MailetLoader mailetLoader) {
+ this.mailetLoader = mailetLoader;
+ }
+
+ @Resource(name= "mailetcontext")
+ public void setMailetContext(MailetContext mailetContext) {
+ this.mailetContext = mailetContext;
+ }
+
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailProcessor#service(org.apache.mailet.Mail)
+ */
+ public void service(Mail mail) throws MessagingException {
+ try {
+ producerTemplate.sendBody(getEndpoint(), mail);
+
+ } catch (CamelExecutionException ex) {
+ throw new MessagingException("Unable to process mail " + mail.getName(),ex);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailetContainer#getMailets()
+ */
+ public List<Mailet> getMailets() {
+ return Collections.unmodifiableList(mailets);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailetContainer#getMatchers()
+ */
+ public List<Matcher> getMatchers() {
+ return Collections.unmodifiableList(matchers);
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailetContainer#addListener(org.apache.james.mailetcontainer.api.MailetContainerListener)
+ */
+ public void addListener(MailetContainerListener listener) {
+ listeners.add(listener);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailetContainer#removeListener(org.apache.james.mailetcontainer.api.MailetContainerListener)
+ */
+ public void removeListener(MailetContainerListener listener) {
+ listeners.remove(listener);
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailetContainer#getListeners()
+ */
+ public List<MailetContainerListener> getListeners() {
+ return listeners;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.CamelContextAware#getCamelContext()
+ */
+ public CamelContext getCamelContext() {
+ return context;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.CamelContextAware#setCamelContext(org.apache.camel.CamelContext)
+ */
+ public void setCamelContext(CamelContext context) {
+ this.context = context;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.api.LogEnabled#setLog(org.apache.commons.logging.Log)
+ */
+ public void setLog(Log log) {
+ this.logger = log;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.api.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration)
+ */
+ public void configure(HierarchicalConfiguration config) throws ConfigurationException {
+ this.config = config;
+ this.processorName = config.getString("[@name]", null);
+ if (processorName == null) throw new ConfigurationException("Processor name must be configured");
+ this.enableJmx = config.getBoolean("[@enableJmx]", true);
+
+ }
+
+ /**
+ * Return the endpoint for the processorname.
+ *
+ * This will return a "direct" endpoint.
+ *
+ * @param processorName
+ * @return endPoint
+ */
+ protected String getEndpoint() {
+ return "direct:processor." + processorName;
+ }
+
+
+ @PostConstruct
+ public void init() throws Exception {
+ context.addRoutes(new MailetContainerRouteBuilder());
+
+
+ producerTemplate = context.createProducerTemplate();
+ if (enableJmx) {
+ this.jmxListener = new JMXMailetContainerListener(processorName, this);
+ addListener(jmxListener);
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ listeners.clear();
+ if (enableJmx && jmxListener != null) {
+ jmxListener.dispose();
+ }
+
+ for (int i = 0; i < mailets.size(); i++) {
+ Mailet m = mailets.get(i);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shutdown mailet " + m.getMailetInfo());
+ }
+ m.destroy();
+
+ }
+
+ for (int i = 0; i < matchers.size(); i++) {
+ Matcher m = matchers.get(i);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shutdown matcher " + m.getMatcherInfo());
+ }
+ m.destroy();
+ }
+
+ }
+
+
+
+ private final class MailetContainerRouteBuilder extends RouteBuilder {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure() throws Exception {
+ Processor disposeProcessor = new DisposeProcessor();
+ Processor removePropsProcessor = new RemovePropertiesProcessor();
+ Processor completeProcessor = new CompleteProcessor();
+
+ if (processorName.equals(Mail.GHOST)) throw new ConfigurationException("ProcessorName of " + Mail.GHOST + " is reserved for internal use, choose a different name");
+
+
+ RouteDefinition processorDef = from(getEndpoint()).routeId(processorName).inOnly()
+ // store the logger in properties
+ .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
+
+ // load composite matchers if there are any
+ Map<String,Matcher> compositeMatchers = new HashMap<String, Matcher>();
+ loadCompositeMatchers(processorName, compositeMatchers, config.configurationsAt("matcher"));
+
+
+ final List<HierarchicalConfiguration> mailetConfs = config.configurationsAt("mailet");
+
+ // Loop through the mailet configuration, load
+ // all of the matcher and mailets, and add
+ // them to the processor.
+ for (int j = 0; j < mailetConfs.size(); j++) {
+ HierarchicalConfiguration c = mailetConfs.get(j);
+
+ // We need to set this because of correctly parsing comma
+ String mailetClassName = c.getString("[@class]");
+ String matcherName = c.getString("[@match]", null);
+ String invertedMatcherName = c.getString("[@notmatch]", null);
+
+ Mailet mailet = null;
+ Matcher matcher = null;
+ try {
+
+ if (matcherName != null && invertedMatcherName != null) {
+ // if no matcher is configured throw an Exception
+ throw new ConfigurationException("Please configure only match or nomatch per mailet");
+ } else if (matcherName != null) {
+ // try to load from compositeMatchers first
+ matcher = compositeMatchers.get(matcherName);
+ if (matcher == null) {
+ // no composite Matcher found, try to load it via MatcherLoader
+ matcher = matcherLoader.getMatcher(createMatcherConfig(matcherName));
+ }
+ } else if (invertedMatcherName != null) {
+ // try to load from compositeMatchers first
+ matcher = compositeMatchers.get(matcherName);
+ if (matcher == null) {
+ // no composite Matcher found, try to load it via MatcherLoader
+ matcher = matcherLoader.getMatcher(createMatcherConfig(invertedMatcherName));
+ }
+ matcher = new MatcherInverter(matcher);
+
+ } else {
+ // default matcher is All
+ matcher = matcherLoader.getMatcher(createMatcherConfig("All"));
+ }
+
+ // The matcher itself should log that it's been inited.
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception: ", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init matcher " + matcherName);
+ System.err.println("Check spool manager logs for more details.");
+ // System.exit(1);
+ throw new ConfigurationException("Unable to init matcher", ex);
+ }
+ try {
+ mailet = mailetLoader.getMailet(createMailetConfig(mailetClassName, c));
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception: ", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init mailet " + mailetClassName);
+ System.err.println("Check spool manager logs for more details.");
+ throw new ConfigurationException("Unable to init mailet", ex);
+ }
+ if (mailet != null && matcher != null) {
+
+ String onMatchException = null;
+ MailetConfig mailetConfig = mailet.getMailetConfig();
+
+ if (mailetConfig instanceof MailetConfigImpl) {
+ onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
+ }
+
+ MailetProcessor mailetProccessor = new MailetProcessor(mailet, logger, CamelMailetContainer.this);
+ // Store the matcher to use for splitter in properties
+ processorDef
+ .setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException)).setProperty(MatcherSplitter.MAILETCONTAINER_PROPERTY, constant(CamelMailetContainer.this))
+
+ // do splitting of the mail based on the stored matcher
+ .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
+
+ .choice().when(new MatcherMatch()).process(mailetProccessor).end()
+
+ .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
+
+ .choice().when(new MailStateNotEquals(processorName)).process(completeProcessor).stop().end();
+
+ // store mailet and matcher
+ mailets.add(mailet);
+ matchers.add(matcher);
+ }
+
+
+ }
+
+ Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger, CamelMailetContainer.this);
+
+
+ processorDef
+ // start choice
+ .choice()
+
+ // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
+ // make sure we don't fall into a endless loop
+ .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
+
+
+ // dispose when needed
+ .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
+
+ // this container is complete
+ .otherwise().process(completeProcessor).stop();
+
+
+ }
+
+
+ private MailetConfig createMailetConfig(String mailetName, HierarchicalConfiguration configuration) {
+
+ final MailetConfigImpl configImpl = new MailetConfigImpl();
+ configImpl.setMailetName(mailetName);
+ configImpl.setConfiguration(configuration);
+ configImpl.setMailetContext(mailetContext);
+ return configImpl;
+ }
+
+ private MatcherConfig createMatcherConfig(String matchName) {
+
+ String condition = (String) null;
+ int i = matchName.indexOf('=');
+ if (i != -1) {
+ condition = matchName.substring(i + 1);
+ matchName = matchName.substring(0, i);
+ }
+ final MatcherConfigImpl configImpl = new MatcherConfigImpl();
+ configImpl.setMatcherName(matchName);
+ configImpl.setCondition(condition);
+ configImpl.setMailetContext(mailetContext);
+ return configImpl;
+
+ }
+
+ /**
+ * Load {@link CompositeMatcher} implementations and their child {@link Matcher}'s
+ *
+ * CompositeMatcher were added by JAMES-948
+ *
+ * @param processorName
+ * @param compMap
+ * @param compMatcherConfs
+ * @return compositeMatchers
+ * @throws ConfigurationException
+ * @throws MessagingException
+ * @throws NotCompliantMBeanException
+ */
+ @SuppressWarnings("unchecked")
+ private List<Matcher> loadCompositeMatchers(String processorName, Map<String,Matcher> compMap, List<HierarchicalConfiguration> compMatcherConfs) throws ConfigurationException, MessagingException, NotCompliantMBeanException {
+ List<Matcher> matchers = new ArrayList<Matcher>();
+
+ for (int j= 0 ; j < compMatcherConfs.size(); j++) {
+ HierarchicalConfiguration c = compMatcherConfs.get(j);
+ String compName = c.getString("[@name]", null);
+ String matcherName = c.getString("[@match]", null);
+ String invertedMatcherName = c.getString("[@notmatch]", null);
+
+ Matcher matcher = null;
+ if (matcherName != null && invertedMatcherName != null) {
+ // if no matcher is configured throw an Exception
+ throw new ConfigurationException("Please configure only match or nomatch per mailet");
+ } else if (matcherName != null) {
+ matcher = matcherLoader.getMatcher(createMatcherConfig(matcherName));
+ if (matcher instanceof CompositeMatcher) {
+ CompositeMatcher compMatcher = (CompositeMatcher) matcher;
+
+ List<Matcher> childMatcher = loadCompositeMatchers(processorName, compMap,c.configurationsAt("matcher"));
+ for (int i = 0 ; i < childMatcher.size(); i++) {
+ compMatcher.add(childMatcher.get(i));
+ }
+ }
+ } else if (invertedMatcherName != null) {
+ Matcher m = matcherLoader.getMatcher(createMatcherConfig(invertedMatcherName));
+ if (m instanceof CompositeMatcher) {
+ CompositeMatcher compMatcher = (CompositeMatcher) m;
+
+ List<Matcher> childMatcher = loadCompositeMatchers(processorName, compMap,c.configurationsAt("matcher"));
+ for (int i = 0 ; i < childMatcher.size(); i++) {
+ compMatcher.add(childMatcher.get(i));
+ }
+ }
+ matcher = new MatcherInverter(m);
+ }
+ if (matcher == null) throw new ConfigurationException("Unable to load matcher instance");
+ matchers.add(matcher);
+ if (compName != null) {
+ // check if there is already a composite Matcher with the name registered in the processor
+ if (compMap.containsKey(compName)) throw new ConfigurationException("CompositeMatcher with name " + compName + " is already defined in processor " + processorName);
+ compMap.put(compName, matcher);
+ }
+ }
+ return matchers;
+ }
+
+
+ private final class RemovePropertiesProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ exchange.removeProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY);
+ exchange.removeProperty(MatcherSplitter.MATCHER_PROPERTY);
+ }
+ }
+
+ /**
+ * Mailet which protect us to not fall into an endless loop caused by an configuration error
+ *
+ */
+ private final class TerminatingMailet extends GenericMailet {
+ /**
+ * The name of the mailet used to terminate the mailet chain. The
+ * end of the matcher/mailet chain must be a matcher that matches
+ * all mails and a mailet that sets every mail to GHOST status.
+ * This is necessary to ensure that mails are removed from the spool
+ * in an orderly fashion.
+ */
+ private static final String TERMINATING_MAILET_NAME = "Terminating%Mailet%Name";
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.mailet.base.GenericMailet#service(org.apache.mailet.Mail)
+ */
+ public void service(Mail mail) {
+ if (!(Mail.ERROR.equals(mail.getState()))) {
+ // Don't complain if we fall off the end of the
+ // error processor. That is currently the
+ // normal situation for James, and the message
+ // will show up in the error store.
+ StringBuffer warnBuffer = new StringBuffer(256)
+ .append("Message ")
+ .append(mail.getName())
+ .append(" reached the end of this processor, and is automatically deleted. This may indicate a configuration error.");
+ logger.warn(warnBuffer.toString());
+ }
+
+ // Set the mail to ghost state
+ mail.setState(Mail.GHOST);
+ }
+
+ @Override
+ public String getMailetInfo() {
+ return getMailetName();
+ }
+
+ @Override
+ public String getMailetName() {
+ return TERMINATING_MAILET_NAME;
+ }
+ }
+
+ private final class CompleteProcessor implements Processor {
+
+ public void process(Exchange ex) throws Exception {
+ logger.debug("End of mailetcontainer" + processorName + " reached");
+ ex.setProperty(Exchange.ROUTE_STOP, true);
+ }
+ }
+
+ }
+
+}
Added: james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/AbstractMailProcessorList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/AbstractMailProcessorList.java?rev=1051556&view=auto
==============================================================================
--- james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/AbstractMailProcessorList.java (added)
+++ james/server/trunk/mailetcontainer-library/src/main/java/org/apache/james/mailetcontainer/lib/AbstractMailProcessorList.java Tue Dec 21 16:59:52 2010
@@ -0,0 +1,207 @@
+package org.apache.james.mailetcontainer.lib;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.mail.MessagingException;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.james.lifecycle.api.Configurable;
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.lifecycle.api.LogEnabled;
+import org.apache.james.mailetcontainer.api.MailProcessor;
+import org.apache.james.mailetcontainer.api.MailProcessorList;
+import org.apache.james.mailetcontainer.api.MailProcessorListListener;
+import org.apache.james.mailetcontainer.api.MailetContainer;
+import org.apache.james.mailetcontainer.api.jmx.ProcessorManagementMBean;
+import org.apache.james.mailetcontainer.lib.jmx.JMXMailProcessorListListener;
+import org.apache.mailet.Mail;
+
+public abstract class AbstractMailProcessorList implements MailProcessorList, Configurable, LogEnabled, ProcessorManagementMBean{
+
+ private List<MailProcessorListListener> listeners = Collections.synchronizedList(new ArrayList<MailProcessorListListener>());
+ private final Map<String,MailProcessor> processors = new HashMap<String,MailProcessor>();
+ protected Log logger;
+ protected HierarchicalConfiguration config;
+
+ private JMXMailProcessorListListener jmxListener;
+ private boolean enableJmx = true;
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
+ */
+ public void setLog(Log log) {
+ this.logger = log;
+
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailProcessorList#addListener(org.apache.james.mailetcontainer.api.MailProcessorListListener)
+ */
+ public void addListener(MailProcessorListListener listener) {
+ listeners.add(listener);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailProcessorList#getListeners()
+ */
+ public List<MailProcessorListListener> getListeners() {
+ return listeners;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailProcessorList#removeListener(org.apache.james.mailetcontainer.api.MailProcessorListListener)
+ */
+ public void removeListener(MailProcessorListListener listener) {
+ listeners.remove(listener);
+ }
+
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration)
+ */
+ public void configure(HierarchicalConfiguration config) throws ConfigurationException {
+ this.config = config;
+ this.enableJmx = config.getBoolean("[@enableJmx]", true);
+
+ }
+
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.api.MailProcessor#service(org.apache.mailet.Mail)
+ */
+ public void service(Mail mail) throws MessagingException {
+ long start = System.currentTimeMillis();
+ MessagingException ex = null;
+ MailProcessor processor = getProcessor(mail.getState());
+
+ if (processor != null) {
+ logger.info("Call MailProcessor " + mail.getState());
+ try {
+ processor.service(mail);
+
+ // check the mail needs further processing
+ if (Mail.GHOST.equalsIgnoreCase(mail.getState()) == false) {
+ service(mail);
+ } else {
+ LifecycleUtil.dispose(mail);
+ }
+
+ } catch (MessagingException e) {
+ ex = e;
+ throw e;
+ } finally {
+ long end = System.currentTimeMillis() - start;
+ for (int i = 0; i < listeners.size(); i++) {
+ MailProcessorListListener listener = listeners.get(i);
+
+ listener.afterProcessor(processor, mail.getName(), end, ex);
+ }
+ }
+ } else {
+ throw new MessagingException("No processor found for mail " + mail.getName() + " with state " + mail.getState());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.transport.ProcessorList#getProcessor(java.lang.String)
+ */
+ public MailProcessor getProcessor(String name) {
+ return processors.get(name);
+ }
+
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.mailetcontainer.MailProcessorList#getProcessorNames()
+ */
+ public String[] getProcessorNames() {
+ return processors.keySet().toArray(new String[processors.size()]);
+ }
+
+ /**
+ * Check if all needed Processors are configured and if not throw a {@link ConfigurationException}
+ *
+ * @throws ConfigurationException
+ */
+ private void checkProcessors() throws ConfigurationException {
+ boolean errorProcessorFound = false;
+ boolean rootProcessorFound = false;
+ Iterator<String> names = processors.keySet().iterator();
+ while(names.hasNext()) {
+ String name = names.next();
+ if (name.equals(Mail.DEFAULT)) {
+ rootProcessorFound = true;
+ } else if (name.equals(Mail.ERROR)) {
+ errorProcessorFound = true;
+ }
+
+ if (errorProcessorFound && rootProcessorFound) {
+ return;
+ }
+ }
+ if (errorProcessorFound == false) {
+ throw new ConfigurationException("You need to configure a Processor with name " + Mail.ERROR);
+ } else if (rootProcessorFound == false) {
+ throw new ConfigurationException("You need to configure a Processor with name " + Mail.DEFAULT);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @PostConstruct
+ public void init() throws Exception {
+ List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
+ for (int i = 0; i < processorConfs.size(); i++) {
+ final HierarchicalConfiguration processorConf = processorConfs.get(i);
+ String processorName = processorConf.getString("[@name]");
+
+ processors.put(processorName, createMailetContainer(processorName, processorConf));
+ }
+
+
+ if (enableJmx) {
+ this.jmxListener = new JMXMailProcessorListListener(this);
+ addListener(jmxListener);
+ }
+
+ // check if all needed processors are configured
+ checkProcessors();
+ }
+
+ @PreDestroy
+ public void dispose() {
+
+ if (jmxListener != null) {
+ jmxListener.dispose();
+ }
+ }
+
+ /**
+ * Create a new {@link MailetContainer}
+ *
+ * @param name
+ * @param config
+ * @return container
+ * @throws Exception
+ */
+ protected abstract MailetContainer createMailetContainer(String name, HierarchicalConfiguration config) throws Exception;
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org