You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/10/06 14:15:05 UTC

svn commit: r1005005 - in /james/server/trunk: mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/ spoolmanager/src/main/java/org/apache/james/transport/ spring-deployment/src/main/config/james/

Author: norman
Date: Wed Oct  6 12:15:05 2010
New Revision: 1005005

URL: http://svn.apache.org/viewvc?rev=1005005&view=rev
Log:
* Use amq: namespace to configure it via spring, so no need to activemq.xml. 
* Some more fine tuning to make sure that JamesSpoolManager is really thread-safe on shutdown

Removed:
    james/server/trunk/spring-deployment/src/main/config/james/activemq.xml
Modified:
    james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
    james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml

Modified: james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java (original)
+++ james/server/trunk/mailetcontainer-camel/src/main/java/org/apache/james/mailetcontainer/camel/CamelMailProcessorList.java Wed Oct  6 12:15:05 2010
@@ -25,10 +25,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
 import javax.annotation.Resource;
 import javax.mail.MessagingException;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.CamelExecutionException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -60,7 +63,7 @@ import org.apache.mailet.base.MatcherInv
  * It also offer the {@link MailProcessorList} implementation which allow to inject {@link Mail} into the routes
  * 
  */
-public class CamelMailProcessorList extends RouteBuilder implements Configurable, LogEnabled, MailProcessorList {
+public class CamelMailProcessorList implements Configurable, LogEnabled, MailProcessorList, CamelContextAware {
 
     private MatcherLoader matcherLoader;
     private HierarchicalConfiguration config;
@@ -83,150 +86,16 @@ public class CamelMailProcessorList exte
     }
 
     private ProducerTemplate producerTemplate;
-    /*
-     * (non-Javadoc)
-     * @see org.apache.camel.builder.RouteBuilder#configure()
-     */
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure() throws Exception {
-        Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
-        Processor disposeProcessor = new DisposeProcessor();
-        Processor mailProcessor = new MailCamelProcessor();
-        Processor removePropsProcessor = new RemovePropertiesProcessor();
-        
-        List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
-        for (int i = 0; i < processorConfs.size(); i++) {
-            final HierarchicalConfiguration processorConf = processorConfs.get(i);
-            String processorName = processorConf.getString("[@name]");
-
-          
-            mailets.put(processorName, new ArrayList<Mailet>());
-            matchers.put(processorName, new ArrayList<Matcher>());
-
-            RouteDefinition processorDef = from(getEndpoint(processorName)).inOnly()
-            // store the logger in properties
-            .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));   
-               
-            
-            final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
-            // Loop through the mailet configuration, load
-            // all of the matcher and mailets, and add
-            // them to the processor.
-            for (int j = 0; j < mailetConfs.size(); j++) {
-                HierarchicalConfiguration c = mailetConfs.get(j);
-
-                // We need to set this because of correctly parsing comma
-                String mailetClassName = c.getString("[@class]");
-                String matcherName = c.getString("[@match]", null);
-                String invertedMatcherName = c.getString("[@notmatch]", null);
-
-                Mailet mailet = null;
-                Matcher matcher = null;
-                try {
-
-                    if (matcherName != null && invertedMatcherName != null) {
-                        // if no matcher is configured throw an Exception
-                        throw new ConfigurationException("Please configure only match or nomatch per mailet");
-                    } else if (matcherName != null) {
-                        matcher = matcherLoader.getMatcher(matcherName);
-                    } else if (invertedMatcherName != null) {
-                        matcher = new MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
-
-                    } else {
-                        // default matcher is All
-                        matcher = matcherLoader.getMatcher("All");
-                    }
-
-                    // The matcher itself should log that it's been inited.
-                    if (logger.isInfoEnabled()) {
-                        StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
-                        logger.info(infoBuffer.toString());
-                    }
-                } catch (MessagingException ex) {
-                    // **** Do better job printing out exception
-                    if (logger.isErrorEnabled()) {
-                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
-                        logger.error(errorBuffer.toString(), ex);
-                        if (ex.getNextException() != null) {
-                            logger.error("Caused by nested exception: ", ex.getNextException());
-                        }
-                    }
-                    System.err.println("Unable to init matcher " + matcherName);
-                    System.err.println("Check spool manager logs for more details.");
-                    // System.exit(1);
-                    throw new ConfigurationException("Unable to init matcher", ex);
-                }
-                try {
-                    mailet = mailetLoader.getMailet(mailetClassName, c);
-                    if (logger.isInfoEnabled()) {
-                        StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
-                        logger.info(infoBuffer.toString());
-                    }
-                } catch (MessagingException ex) {
-                    // **** Do better job printing out exception
-                    if (logger.isErrorEnabled()) {
-                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
-                        logger.error(errorBuffer.toString(), ex);
-                        if (ex.getNextException() != null) {
-                            logger.error("Caused by nested exception: ", ex.getNextException());
-                        }
-                    }
-                    System.err.println("Unable to init mailet " + mailetClassName);
-                    System.err.println("Check spool manager logs for more details.");
-                    throw new ConfigurationException("Unable to init mailet", ex);
-                }
-                if (mailet != null && matcher != null) {
-                    String onMatchException = null;
-                    MailetConfig mailetConfig = mailet.getMailetConfig();
-                    
-                    if (mailetConfig instanceof MailetConfigImpl) {
-                        onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
-                    }
-                    
-                    // Store the matcher to use for splitter in properties
-                    processorDef
-                    		.setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
-                            
-                            // do splitting of the mail based on the stored matcher
-                            .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
-
-                            .choice().when(new MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
-                            
-                            .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
-
-                            .choice().when(new MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
-
-                    // store mailet and matcher
-                    mailets.get(processorName).add(mailet);
-                    matchers.get(processorName).add(matcher);
-                }
-              
+	private CamelContext camelContext;
+    
 
-            }
-            
-            processorDef
-                    // start choice
-                    .choice()
-                 
-                    // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
-                    // make sure we don't fall into a endless loop
-                    .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
-                    
-                       
-                    // dispose when needed
-                    .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
-                    
-                     // route it to the next processor
-                    .otherwise().process(mailProcessor).stop();
-                  
-            processors.put(processorName, new ChildMailProcessor(processorName));
-        }
-                
-        producerTemplate = getContext().createProducerTemplate();
-    }
 
+	@PostConstruct
+	public void init() throws Exception  {
+		getCamelContext().addRoutes(new SpoolRouteBuilder());
 
+		producerTemplate = getCamelContext().createProducerTemplate();
+	}
     /**
      * Destroy all mailets and matchers
      */
@@ -428,4 +297,156 @@ public class CamelMailProcessorList exte
         
     }
 
+	public CamelContext getCamelContext() {
+		return camelContext;
+	}
+
+	public void setCamelContext(CamelContext camelContext) {
+		this.camelContext = camelContext;
+	}
+	
+	private final class SpoolRouteBuilder extends RouteBuilder {
+		/*
+	     * (non-Javadoc)
+	     * @see org.apache.camel.builder.RouteBuilder#configure()
+	     */
+	    @SuppressWarnings("unchecked")
+	    @Override
+	    public void configure() throws Exception {
+	        Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
+	        Processor disposeProcessor = new DisposeProcessor();
+	        Processor mailProcessor = new MailCamelProcessor();
+	        Processor removePropsProcessor = new RemovePropertiesProcessor();
+	        
+	        List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
+	        for (int i = 0; i < processorConfs.size(); i++) {
+	            final HierarchicalConfiguration processorConf = processorConfs.get(i);
+	            String processorName = processorConf.getString("[@name]");
+
+	          
+	            mailets.put(processorName, new ArrayList<Mailet>());
+	            matchers.put(processorName, new ArrayList<Matcher>());
+
+	            RouteDefinition processorDef = from(getEndpoint(processorName)).inOnly()
+	            // store the logger in properties
+	            .setProperty(MatcherSplitter.LOGGER_PROPERTY, constant(logger));   
+	               
+	            
+	            final List<HierarchicalConfiguration> mailetConfs = processorConf.configurationsAt("mailet");
+	            // Loop through the mailet configuration, load
+	            // all of the matcher and mailets, and add
+	            // them to the processor.
+	            for (int j = 0; j < mailetConfs.size(); j++) {
+	                HierarchicalConfiguration c = mailetConfs.get(j);
+
+	                // We need to set this because of correctly parsing comma
+	                String mailetClassName = c.getString("[@class]");
+	                String matcherName = c.getString("[@match]", null);
+	                String invertedMatcherName = c.getString("[@notmatch]", null);
+
+	                Mailet mailet = null;
+	                Matcher matcher = null;
+	                try {
+
+	                    if (matcherName != null && invertedMatcherName != null) {
+	                        // if no matcher is configured throw an Exception
+	                        throw new ConfigurationException("Please configure only match or nomatch per mailet");
+	                    } else if (matcherName != null) {
+	                        matcher = matcherLoader.getMatcher(matcherName);
+	                    } else if (invertedMatcherName != null) {
+	                        matcher = new MatcherInverter(matcherLoader.getMatcher(invertedMatcherName));
+
+	                    } else {
+	                        // default matcher is All
+	                        matcher = matcherLoader.getMatcher("All");
+	                    }
+
+	                    // The matcher itself should log that it's been inited.
+	                    if (logger.isInfoEnabled()) {
+	                        StringBuffer infoBuffer = new StringBuffer(64).append("Matcher ").append(matcherName).append(" instantiated.");
+	                        logger.info(infoBuffer.toString());
+	                    }
+	                } catch (MessagingException ex) {
+	                    // **** Do better job printing out exception
+	                    if (logger.isErrorEnabled()) {
+	                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init matcher ").append(matcherName).append(": ").append(ex.toString());
+	                        logger.error(errorBuffer.toString(), ex);
+	                        if (ex.getNextException() != null) {
+	                            logger.error("Caused by nested exception: ", ex.getNextException());
+	                        }
+	                    }
+	                    System.err.println("Unable to init matcher " + matcherName);
+	                    System.err.println("Check spool manager logs for more details.");
+	                    // System.exit(1);
+	                    throw new ConfigurationException("Unable to init matcher", ex);
+	                }
+	                try {
+	                    mailet = mailetLoader.getMailet(mailetClassName, c);
+	                    if (logger.isInfoEnabled()) {
+	                        StringBuffer infoBuffer = new StringBuffer(64).append("Mailet ").append(mailetClassName).append(" instantiated.");
+	                        logger.info(infoBuffer.toString());
+	                    }
+	                } catch (MessagingException ex) {
+	                    // **** Do better job printing out exception
+	                    if (logger.isErrorEnabled()) {
+	                        StringBuffer errorBuffer = new StringBuffer(256).append("Unable to init mailet ").append(mailetClassName).append(": ").append(ex.toString());
+	                        logger.error(errorBuffer.toString(), ex);
+	                        if (ex.getNextException() != null) {
+	                            logger.error("Caused by nested exception: ", ex.getNextException());
+	                        }
+	                    }
+	                    System.err.println("Unable to init mailet " + mailetClassName);
+	                    System.err.println("Check spool manager logs for more details.");
+	                    throw new ConfigurationException("Unable to init mailet", ex);
+	                }
+	                if (mailet != null && matcher != null) {
+	                    String onMatchException = null;
+	                    MailetConfig mailetConfig = mailet.getMailetConfig();
+	                    
+	                    if (mailetConfig instanceof MailetConfigImpl) {
+	                        onMatchException = ((MailetConfigImpl) mailetConfig).getInitAttribute("onMatchException");
+	                    }
+	                    
+	                    // Store the matcher to use for splitter in properties
+	                    processorDef
+	                    		.setProperty(MatcherSplitter.MATCHER_PROPERTY, constant(matcher)).setProperty(MatcherSplitter.ON_MATCH_EXCEPTION_PROPERTY, constant(onMatchException))
+	                            
+	                            // do splitting of the mail based on the stored matcher
+	                            .split().method(MatcherSplitter.class).aggregationStrategy(aggr).parallelProcessing()
+
+	                            .choice().when(new MatcherMatch()).process(new MailetProcessor(mailet, logger)).end()
+	                            
+	                            .choice().when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop().otherwise().process(removePropsProcessor).end()
+
+	                            .choice().when(new MailStateNotEquals(processorName)).process(mailProcessor).stop().end();
+
+	                    // store mailet and matcher
+	                    mailets.get(processorName).add(mailet);
+	                    matchers.get(processorName).add(matcher);
+	                }
+	              
+
+	            }
+	            
+	            processorDef
+	                    // start choice
+	                    .choice()
+	                 
+	                    // when the mail state did not change till yet ( the end of the route) we need to call the TerminatingMailet to
+	                    // make sure we don't fall into a endless loop
+	                    .when(new MailStateEquals(processorName)).process(terminatingMailetProcessor).stop()
+	                    
+	                       
+	                    // dispose when needed
+	                    .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
+	                    
+	                     // route it to the next processor
+	                    .otherwise().process(mailProcessor).stop();
+	                  
+	            processors.put(processorName, new ChildMailProcessor(processorName));
+	        }
+	                
+	    }
+	}
+
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/JamesSpoolManager.java Wed Oct  6 12:15:05 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.annotation.PostConstruct;
 import javax.annotation.PreDestroy;
@@ -73,7 +74,7 @@ public class JamesSpoolManager implement
     /**
      * Number of active threads
      */
-    private int numActive;
+    private AtomicInteger numActive = new AtomicInteger(0);;
 
     /**
      * Spool threads are active
@@ -132,7 +133,6 @@ public class JamesSpoolManager implement
         }
 
         active.set(true);
-        numActive = 0;
         spoolThreads = new java.util.ArrayList<Thread>(numThreads);
         for ( int i = 0 ; i < numThreads ; i++ ) {
             Thread reader = new Thread(this, "Spool Thread #" + i);
@@ -153,8 +153,9 @@ public class JamesSpoolManager implement
             logger.info("Spool=" + queue.getClass().getName());
         }
 
-        numActive++;
         while(active.get()) {
+            numActive.incrementAndGet();
+
             try {
                 queue.deQueue(new DequeueOperation() {
                     
@@ -189,12 +190,14 @@ public class JamesSpoolManager implement
                     logger.error("Exception processing mail in JamesSpoolManager.run "
                                       + e.getMessage(), e);
                 }
+            } finally {
+                numActive.decrementAndGet();
             }
+
         }
         if (logger.isInfoEnabled()){
             logger.info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
         }
-        numActive--;
     }
 
     /**
@@ -217,11 +220,13 @@ public class JamesSpoolManager implement
 
         long stop = System.currentTimeMillis() + 60000;
         // give the spooler threads one minute to terminate gracefully
-        while (numActive != 0 && stop > System.currentTimeMillis()) {
+        /*
+        while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
             try {
                 Thread.sleep(1000);
             } catch (Exception ignored) {}
         }
+        */
         logger.info("JamesSpoolManager thread shutdown completed.");
     }
 

Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=1005005&r1=1005004&r2=1005005&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Wed Oct  6 12:15:05 2010
@@ -21,10 +21,12 @@
 <beans xmlns="http://www.springframework.org/schema/beans" 
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:camel="http://camel.apache.org/schema/spring"
+       xmlns:amq="http://activemq.apache.org/schema/core" 
        xsi:schemaLocation="
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
-          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
-
+          http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
+          http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+          
 
     <!--
         ** JMX part ** to enable exposure of JMX, activate the following beans
@@ -124,31 +126,28 @@
     <camel:camelContext id="jamesCamelContext" trace="false" >
         <camel:jmxAgent id="agent" disabled="true"/>
         <camel:template id="producerTemplate"/>   
-        <camel:routeBuilder ref="processorRoute" /> 
     </camel:camelContext>
 
-   
-    <!-- jms connection pooling  -->
-    <bean id="jmsConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
-        <property name="connectionFactory">
-            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
-                <property name="brokerURL" value="vm://localhost?broker.useJmx=false"/>
-            </bean>
-        </property>
-        <!--
-        <property name="transactionManager" ref="jmsTransactionManager"/>
-        -->
+    <!--  lets create an embedded ActiveMQ Broker -->
+    <amq:broker useJmx="false" persistent="true" dataDirectory="../data/" schedulerSupport="true" id="broker">
+        <amq:transportConnectors>
+            <amq:transportConnector uri="tcp://localhost:0" />
+        </amq:transportConnectors>
+    </amq:broker>
+
+
+    <amq:connectionFactory id="amqConnectionFactory" brokerURL="vm://localhost?create=false" />
+
+    <!-- CachingConnectionFactory Definition, sessionCacheSize property is the number of sessions to cache -->
+    <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
+        <constructor-arg ref="amqConnectionFactory" />
+        <property name="sessionCacheSize" value="100" />
     </bean>
 
     <!-- setup spring jms TX manager -->
     <bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
         <property name="connectionFactory" ref="jmsConnectionFactory"/>
     </bean>
-    
-    <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
-        <property name="config" value="file://conf/activemq.xml" />
-        <property name="start" value="true" />
-    </bean>
 
     <bean id="mailQueueFactory" class="org.apache.james.queue.activemq.ActiveMQMailQueueFactory" depends-on="broker">
         <!-- Allow to specify if BlobMessage or BytesMessage should be used for storing the Mail in the queue-->
@@ -156,6 +155,7 @@
         <!-- By default only BytesMessage is used -->
         <property name="sizeTreshold" value="-1"/>
     </bean>
+   
         
     <!-- Build the camelroute from the spoolmanager.xml -->
     <bean id="mailProcessor" name="processorRoute" class="org.apache.james.mailetcontainer.camel.CamelMailProcessorList"/>



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org