You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/10/06 14:15:05 UTC
svn commit: r1005005 - in /james/server/trunk:
mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/
spoolmanager/src/main/java/org/apache/james/transport/
spring-deployment/src/main/config/james/
Author: norman
Date: Wed Oct 6 12:15:05 2010
New Revision: 1005005
URL: http://svn.apache.org/viewvc?rev=1005005&view=rev
Log:
* Use amq: namespace to configure it via spring, so no need to activemq.xml.
* Some more fine tuning to make sure that JamesSpoolManager is really thread-safe on shutdown
Removed:
james/server/trunk/spring-deployment/src/main/config/james/activemq.xml
Modified:
james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
Modified: james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java (original)
+++ james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java Wed Oct 6 12:15:05 2010
@@ -25,10 +25,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import javax.mail.MessagingException;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -60,7 +63,7 @@ import org.apache.mailet.base.MatcherInv
* It also offer the {@link MailProcessorList} implementation which allow to inject {@link Mail} into the routes
*
*/
-public class CamelMailProcessorList extends RouteBuilder implements Configurable, LogEnabled, MailProcessorList {
+public class CamelMailProcessorList implements Configurable, LogEnabled, MailProcessorList, CamelContextAware {
private MatcherLoader matcherLoader;
private HierarchicalConfiguration config;
@@ -83,150 +86,16 @@ public class CamelMailProcessorList exte
}
private ProducerTemplate producerTemplate;
- /*
- * (non-Javadoc)
- * @see org.apache.camel.builder.RouteBuilder#configure()
- */
- @SuppressWarnings("unchecked")
- @Override
- public void configure() throws Exception {
- Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
- Processor disposeProcessor = new DisposeProcessor();
- Processor mailProcessor = new MailCamelProcessor();
- Processor removePropsProcessor = new RemovePropertiesProcessor();
-
- List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
- for (int i = 0; i < processorConfs.size(); i++) {
- final HierarchicalConfiguration processorConf = processorConfs.get(i);
- String processorName = processorConf.getString("[@name]");
-
-
- mailets.put(processorName, new ArrayList<Mailet>());
- matchers.put(processorName, new ArrayList<Matcher>());
-
- RouteDefinition processorDef = from(getEndpoint(processorName)).inOnly()
- // store the logger in properties
- .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
-
-
- final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
- // Loop through the mailet configuration, load
- // all of the matcher and mailets, and add
- // them to the processor.
- for (int j = 0; j < mailetConfs.size(); j++) {
- HierarchicalConfiguration c = mailetConfs.get(j);
-
- // We need to set this because of correctly parsing comma
- String mailetClassName = c.getString("[@class]");
- String matcherName = c.getString("[@match]", null);
- String invertedMatcherName = c.getString("[@notmatch]", null);
-
- Mailet mailet = null;
- Matcher matcher = null;
- try {
-
- if (matcherName != null && invertedMatcherName != null) {
- // if no matcher is configured throw an Exception
- throw new ConfigurationException("Please configure only match or nomatch per mailet");
- } else if (matcherName != null) {
- matcher = matcherLoader.getMatcher(matcherName);
- } else if (invertedMatcherName != null) {
- matcher = new MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
-
- } else {
- // default matcher is All
- matcher = matcherLoader.getMatcher("All");
- }
-
- // The matcher itself should log that it's been inited.
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ", ex.getNextException());
- }
- }
- System.err.println("Unable to init matcher " + matcherName);
- System.err.println("Check spool manager logs for more details.");
- // System.exit(1);
- throw new ConfigurationException("Unable to init matcher", ex);
- }
- try {
- mailet = mailetLoader.getMailet(mailetClassName, c);
- if (logger.isInfoEnabled()) {
- StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
- logger.info(infoBuffer.toString());
- }
- } catch (MessagingException ex) {
- // **** Do better job printing out exception
- if (logger.isErrorEnabled()) {
- StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
- logger.error(errorBuffer.toString(), ex);
- if (ex.getNextException() != null) {
- logger.error("Caused by nested exception: ", ex.getNextException());
- }
- }
- System.err.println("Unable to init mailet " + mailetClassName);
- System.err.println("Check spool manager logs for more details.");
- throw new ConfigurationException("Unable to init mailet", ex);
- }
- if (mailet != null && matcher != null) {
- String onMatchException = null;
- MailetConfig mailetConfig = mailet.getMailetConfig();
-
- if (mailetConfig instanceof MailetConfigImpl) {
- onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
- }
-
- // Store the matcher to use for splitter in properties
- processorDef
- .setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
-
- // do splitting of the mail based on the stored matcher
- .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
-
- .choice().when(new MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
-
- .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
-
- .choice().when(new MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
-
- // store mailet and matcher
- mailets.get(processorName).add(mailet);
- matchers.get(processorName).add(matcher);
- }
-
+ private CamelContext camelContext;
+
- }
-
- processorDef
- // start choice
- .choice()
-
- // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
- // make sure we don't fall into a endless loop
- .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
-
-
- // dispose when needed
- .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
-
- // route it to the next processor
- .otherwise().process(mailProcessor).stop();
-
- processors.put(processorName, new ChildMailProcessor(processorName));
- }
-
- producerTemplate = getContext().createProducerTemplate();
- }
+ @PostConstruct
+ public void init() throws Exception {
+ getCamelContext().addRoutes(new SpoolRouteBuilder());
+ producerTemplate = getCamelContext().createProducerTemplate();
+ }
/**
* Destroy all mailets and matchers
*/
@@ -428,4 +297,156 @@ public class CamelMailProcessorList exte
}
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
+ private final class SpoolRouteBuilder extends RouteBuilder {
+ /*
+ * (non-Javadoc)
+ * @see org.apache.camel.builder.RouteBuilder#configure()
+ */
+ @SuppressWarnings("unchecked")
+ @Override
+ public void configure() throws Exception {
+ Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
+ Processor disposeProcessor = new DisposeProcessor();
+ Processor mailProcessor = new MailCamelProcessor();
+ Processor removePropsProcessor = new RemovePropertiesProcessor();
+
+ List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
+ for (int i = 0; i < processorConfs.size(); i++) {
+ final HierarchicalConfiguration processorConf = processorConfs.get(i);
+ String processorName = processorConf.getString("[@name]");
+
+
+ mailets.put(processorName, new ArrayList<Mailet>());
+ matchers.put(processorName, new ArrayList<Matcher>());
+
+ RouteDefinition processorDef = from(getEndpoint(processorName)).inOnly()
+ // store the logger in properties
+ .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));
+
+
+ final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
+ // Loop through the mailet configuration, load
+ // all of the matcher and mailets, and add
+ // them to the processor.
+ for (int j = 0; j < mailetConfs.size(); j++) {
+ HierarchicalConfiguration c = mailetConfs.get(j);
+
+ // We need to set this because of correctly parsing comma
+ String mailetClassName = c.getString("[@class]");
+ String matcherName = c.getString("[@match]", null);
+ String invertedMatcherName = c.getString("[@notmatch]", null);
+
+ Mailet mailet = null;
+ Matcher matcher = null;
+ try {
+
+ if (matcherName != null && invertedMatcherName != null) {
+ // if no matcher is configured throw an Exception
+ throw new ConfigurationException("Please configure only match or nomatch per mailet");
+ } else if (matcherName != null) {
+ matcher = matcherLoader.getMatcher(matcherName);
+ } else if (invertedMatcherName != null) {
+ matcher = new MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
+
+ } else {
+ // default matcher is All
+ matcher = matcherLoader.getMatcher("All");
+ }
+
+ // The matcher itself should log that it's been inited.
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception: ", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init matcher " + matcherName);
+ System.err.println("Check spool manager logs for more details.");
+ // System.exit(1);
+ throw new ConfigurationException("Unable to init matcher", ex);
+ }
+ try {
+ mailet = mailetLoader.getMailet(mailetClassName, c);
+ if (logger.isInfoEnabled()) {
+ StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
+ logger.info(infoBuffer.toString());
+ }
+ } catch (MessagingException ex) {
+ // **** Do better job printing out exception
+ if (logger.isErrorEnabled()) {
+ StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
+ logger.error(errorBuffer.toString(), ex);
+ if (ex.getNextException() != null) {
+ logger.error("Caused by nested exception: ", ex.getNextException());
+ }
+ }
+ System.err.println("Unable to init mailet " + mailetClassName);
+ System.err.println("Check spool manager logs for more details.");
+ throw new ConfigurationException("Unable to init mailet", ex);
+ }
+ if (mailet != null && matcher != null) {
+ String onMatchException = null;
+ MailetConfig mailetConfig = mailet.getMailetConfig();
+
+ if (mailetConfig instanceof MailetConfigImpl) {
+ onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
+ }
+
+ // Store the matcher to use for splitter in properties
+ processorDef
+ .setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
+
+ // do splitting of the mail based on the stored matcher
+ .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
+
+ .choice().when(new MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
+
+ .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
+
+ .choice().when(new MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
+
+ // store mailet and matcher
+ mailets.get(processorName).add(mailet);
+ matchers.get(processorName).add(matcher);
+ }
+
+
+ }
+
+ processorDef
+ // start choice
+ .choice()
+
+ // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
+ // make sure we don't fall into a endless loop
+ .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
+
+
+ // dispose when needed
+ .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
+
+ // route it to the next processor
+ .otherwise().process(mailProcessor).stop();
+
+ processors.put(processorName, new ChildMailProcessor(processorName));
+ }
+
+ }
+ }
+
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java Wed Oct 6 12:15:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
@@ -73,7 +74,7 @@ public class JamesSpoolManager implement
/**
* Number of active threads
*/
- private int numActive;
+ private AtomicInteger numActive = new AtomicInteger(0);;
/**
* Spool threads are active
@@ -132,7 +133,6 @@ public class JamesSpoolManager implement
}
active.set(true);
- numActive = 0;
spoolThreads = new java.util.ArrayList<Thread>(numThreads);
for ( int i = 0 ; i < numThreads ; i++ ) {
Thread reader = new Thread(this, "Spool Thread #" + i);
@@ -153,8 +153,9 @@ public class JamesSpoolManager implement
logger.info("Spool=" + queue.getClass().getName());
}
- numActive++;
while(active.get()) {
+ numActive.incrementAndGet();
+
try {
queue.deQueue(new DequeueOperation() {
@@ -189,12 +190,14 @@ public class JamesSpoolManager implement
logger.error("Exception processing mail in JamesSpoolManager.run "
+ e.getMessage(), e);
}
+ } finally {
+ numActive.decrementAndGet();
}
+
}
if (logger.isInfoEnabled()){
logger.info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
}
- numActive--;
}
/**
@@ -217,11 +220,13 @@ public class JamesSpoolManager implement
long stop = System.currentTimeMillis() + 60000;
// give the spooler threads one minute to terminate gracefully
- while (numActive != 0 && stop > System.currentTimeMillis()) {
+ /*
+ while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
try {
Thread.sleep(1000);
} catch (Exception ignored) {}
}
+ */
logger.info("JamesSpoolManager thread shutdown completed.");
}
Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Wed Oct 6 12:15:05 2010
@@ -21,10 +21,12 @@
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:camel="http://camel.apache.org/schema/spring"
+ xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
- http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
-
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
<!--
** JMX part ** to enable exposure of JMX, activate the following beans
@@ -124,31 +126,28 @@
<camel:camelContext id="jamesCamelContext" trace="false" >
<camel:jmxAgent id="agent" disabled="true"/>
<camel:template id="producerTemplate"/>
- <camel:routeBuilder ref="processorRoute" />
</camel:camelContext>
-
- <!-- jms connection pooling -->
- <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
- <property name="connectionFactory">
- <bean class="org.apache.activemq.ActiveMQConnectionFactory">
- <property name="brokerURL" value="vm://localhost?broker.useJmx=false"/>
- </bean>
- </property>
- <!--
- <property name="transactionManager" ref="jmsTransactionManager"/>
- -->
+ <!-- lets create an embedded ActiveMQ Broker -->
+ <amq:broker useJmx="false" persistent="true" dataDirectory="../data/" schedulerSupport="true" id="broker">
+ <amq:transportConnectors>
+ <amq:transportConnector uri="tcp://localhost:0" />
+ </amq:transportConnectors>
+ </amq:broker>
+
+
+ <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost?create=false" />
+
+ <!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
+ <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
+ <constructor-arg ref="amqConnectionFactory" />
+ <property name="sessionCacheSize" value="100" />
</bean>
<!-- setup spring jms TX manager -->
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
-
- <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
- <property name="config" value="file://conf/activemq.xml" />
- <property name="start" value="true" />
- </bean>
<bean id="mailQueueFactory" class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" depends-on="broker">
<!-- Allow to specify if BlobMessage or BytesMessage should be used for storing the Mail in the queue-->
@@ -156,6 +155,7 @@
<!-- By default only BytesMessage is used -->
<property name="sizeTreshold" value="-1"/>
</bean>
+
<!-- Build the camelroute from the spoolmanager.xml -->
<bean id="mailProcessor" name="processorRoute" class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org