You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/15 12:42:58 UTC

svn commit: r997272 - in /james/server/trunk: spoolmanager/src/main/java/org/apache/james/ spoolmanager/src/main/java/org/apache/james/transport/camel/ spring-deployment/src/main/config/james/

Author: norman
Date: Wed Sep 15 10:42:58 2010
New Revision: 997272

URL: http://svn.apache.org/viewvc?rev=997272&view=rev
Log:
Use one jms queue for the incomming spool (before we used one queue per processor in the spool). This improves performance and will help to simplify spool/queue management a lot (JAMES-1046)

Added:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java
Removed:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQRecipientList.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSRecipientList.java
Modified:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
    james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java Wed Sep 15 10:42:58 2010
@@ -41,7 +41,6 @@ import javax.mail.internet.ParseExceptio
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
@@ -61,6 +60,7 @@ import org.apache.james.lifecycle.Lifecy
 import org.apache.james.lifecycle.LogEnabled;
 import org.apache.james.services.MailServer;
 import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
 import org.apache.mailet.Mail;
 import org.apache.mailet.MailAddress;
 
@@ -309,7 +309,7 @@ public abstract class AbstractMailServer
      */
     public void sendMail(Mail mail) throws MessagingException {
         try {
-            producerTemplate.sendBody("direct:mailserver", ExchangePattern.InOnly, mail);
+            producerTemplate.sendBodyAndHeader("direct:mailserver", ExchangePattern.InOnly, mail, JamesCamelConstants.JAMES_MAIL_STATE, mail.getState());
                         
         } catch (Exception e) {
             logger.error("Error storing message: " + e.getMessage(),e);
@@ -437,13 +437,12 @@ public abstract class AbstractMailServer
     }
     
     /**
-     * Return the camel endpoint uri which should get used for the given mail
+     * Return the camel endpoint uri which should get used
      * 
-     * @param mail
      * @return toUri
      * 
      */
-    protected abstract String getToUri(Mail mail);
+    protected abstract String getToUri();
     
     
 
@@ -465,7 +464,6 @@ public abstract class AbstractMailServer
     
        
     private final class InjectionRouteBuilder extends RouteBuilder {
-        private final static String SLIP ="JAMES_TO_SLIP";
         @Override
         public void configure() throws Exception {
             Processor disposeProcessor = new DisposeProcessor();
@@ -479,13 +477,7 @@ public abstract class AbstractMailServer
             // dispose the mail object if route processing was complete
            .onCompletion().process(disposeProcessor).end()
 
-           .transacted().pipeline().beanRef("mailClaimCheck").process(new Processor() {
-
-               public void process(Exchange ex) throws Exception {
-
-                   ex.getIn().setHeader(SLIP, getToUri(ex.getIn().getBody(Mail.class)));
-               }
-           }).routingSlip(SLIP);
+           .transacted().pipeline().beanRef("mailClaimCheck").to(getToUri());
         }
 
     }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java Wed Sep 15 10:42:58 2010
@@ -21,8 +21,6 @@
 package org.apache.james;
 
 
-import org.apache.mailet.Mail;
-
 /**
  * MailServer implementation which use ActiveMQ to spool mails
  * 
@@ -31,7 +29,7 @@ import org.apache.mailet.Mail;
 public class ActiveMQMailServer extends AbstractMailServer{
 
     @Override
-    protected String getToUri(Mail mail) {
-        return "activemq:queue:processor."+ mail.getState();
+    protected String getToUri() {
+        return "activemq:queue:spool";
     }
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java Wed Sep 15 10:42:58 2010
@@ -21,10 +21,8 @@
 package org.apache.james;
 
 
-import org.apache.mailet.Mail;
-
 /**
- * MailServer implementation which use JMS to spool mails#
+ * MailServer implementation which use JMS to spool mails
  * 
  * If you use ActiveMQ for JMS you should use {@link ActiveMQMailServer}
  * 
@@ -33,7 +31,7 @@ import org.apache.mailet.Mail;
 public class JMSMailServer extends AbstractMailServer{
 
     @Override
-    protected String getToUri(Mail mail) {
-        return "jms:queue:processor."+ mail.getState();
+    protected String getToUri() {
+        return "jms:queue:spool";
     }
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -90,6 +90,10 @@ public abstract class AbstractProcessorR
         Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
         Processor disposeProcessor = new DisposeProcessor();
         
+        
+        from(getFromUri()).recipientList().method(ProcessorRecipientList.class);
+        
+        
         List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
         for (int i = 0; i < processorConfs.size(); i++) {
             final HierarchicalConfiguration processorConf = processorConfs.get(i);
@@ -101,7 +105,7 @@ public abstract class AbstractProcessorR
             matchers.put(processorName, new ArrayList<Matcher>());
 
             // Check which route we need to go
-            ChoiceDefinition processorDef = fromF(getFromUri(processorName))
+            ChoiceDefinition processorDef = from("direct:processor." + processorName)
             
                 // exchange mode is inOnly
                 .inOnly()
@@ -224,8 +228,8 @@ public abstract class AbstractProcessorR
                             .when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
                              
                             // check if the state of the mail is the same as the
-                            // current processor. If not just route it to the right endpoint via recipientList and stop processing.
-                            .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(getRecipientList()).stop()
+                            // current processor. If not just route it to the spool again
+                            .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class).stop()
                             
                             // end first choice
                             .end()
@@ -258,8 +262,8 @@ public abstract class AbstractProcessorR
                     // end the choice
                     .end()
                     
-                     // route it to the right processor
-                    .beanRef("mailClaimCheck").recipientList().method(getRecipientList());
+                     // route it to the spool again
+                    .beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class);
                   
         }
     }
@@ -404,17 +408,11 @@ public abstract class AbstractProcessorR
     }
     
     /**
-     * Return the uri for the processor to use for consuming mails
+     * Return the uri for consuming mail
      * 
-     * @param processor
      * @return consumerUri
      */
-    protected abstract String getFromUri(String processor);
+    protected abstract String getFromUri();
 
-    /**
-     * Return the class which get used for dynamic lookup the ToUris for the mails (producers)
-     * 
-     * @return recipientListClass
-     */
-    protected abstract Class<?> getRecipientList();
+  
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -33,13 +33,12 @@ public class ActiveMQProcessorRouteBuild
     }
 	
     @Override
-    protected String getFromUri(String processorName) {
-        return "activemq:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers;
-    }
-
-    @Override
-    protected Class<?> getRecipientList() {
-        return ActiveMQRecipientList.class;
+    protected String getFromUri() {
+        return "activemq:queue:spool?" + getOptions();
     }
 
+	
+	protected String getOptions() {
+		return "maxConcurrentConsumers=" + maxConcurrentConsumers;
+	}
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -23,22 +23,11 @@ package org.apache.james.transport.camel
  * 
  * If you want to use ActiveMQ as JMS implementation you should use {@link ActiveMQProcessorRouteBuilder}
  */
-public class JMSProcessorRouteBuilder extends AbstractProcessorRouteBuilder{
+public class JMSProcessorRouteBuilder extends ActiveMQProcessorRouteBuilder{
 
-	private int maxConcurrentConsumers = 20;
 
-	public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
-		this.maxConcurrentConsumers = maxConcurrentConsumers;
-	}
-	
-    @Override
-    protected String getFromUri(String processorName) {
-        return "jms:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers;
+	@Override
+    protected String getFromUri() {
+        return "jms:queue:spool ?" + getOptions();
     }
-
-    @Override
-    protected Class<?> getRecipientList() {
-        return JMSRecipientList.class;
-    }
-
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java Wed Sep 15 10:42:58 2010
@@ -36,4 +36,5 @@ public interface JamesCamelConstants {
     public final static String JAMES_RETRY_DELIVERY = "JAMES_RETRY_DELIVERY";
 
 
+    public final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
 }

Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java?rev=997272&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java Wed Sep 15 10:42:58 2010
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.transport.camel;
+
+import org.apache.camel.Body;
+import org.apache.mailet.Mail;
+
+public final class ProcessorRecipientList {
+	
+	public String to(@Body Mail mail) {
+		return "direct:processor." + mail.getState();
+	}
+}

Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Wed Sep 15 10:42:58 2010
@@ -124,11 +124,12 @@
         <camel:routeBuilder ref="processorRoute" /> 
     </camel:camelContext>
 
+    
     <bean id="pollingjms"  class="org.apache.james.transport.camel.JMSSelectorPollingComponent"/>
     
     <!-- Build the camelroute from the spoolmanager.xml using ActiveMQ as producer and consumer-->
     <bean id="spoolmanager" name="processorRoute" class="org.apache.james.transport.camel.ActiveMQProcessorRouteBuilder">
-        <property name="maxConcurrentConsumers" value="200"/>
+        <property name="maxConcurrentConsumers" value="500"/>
     </bean>
 
 



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org