You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/15 12:42:58 UTC
svn commit: r997272 - in /james/server/trunk:
spoolmanager/src/main/java/org/apache/james/
spoolmanager/src/main/java/org/apache/james/transport/camel/
spring-deployment/src/main/config/james/
Author: norman
Date: Wed Sep 15 10:42:58 2010
New Revision: 997272
URL: http://svn.apache.org/viewvc?rev=997272&view=rev
Log:
Use one jms queue for the incomming spool (before we used one queue per processor in the spool). This improves performance and will help to simplify spool/queue management a lot (JAMES-1046)
Added:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java
Removed:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQRecipientList.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSRecipientList.java
Modified:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/AbstractMailServer.java Wed Sep 15 10:42:58 2010
@@ -41,7 +41,6 @@ import javax.mail.internet.ParseExceptio
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
-import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
@@ -61,6 +60,7 @@ import org.apache.james.lifecycle.Lifecy
import org.apache.james.lifecycle.LogEnabled;
import org.apache.james.services.MailServer;
import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
@@ -309,7 +309,7 @@ public abstract class AbstractMailServer
*/
public void sendMail(Mail mail) throws MessagingException {
try {
- producerTemplate.sendBody("direct:mailserver", ExchangePattern.InOnly, mail);
+ producerTemplate.sendBodyAndHeader("direct:mailserver", ExchangePattern.InOnly, mail, JamesCamelConstants.JAMES_MAIL_STATE, mail.getState());
} catch (Exception e) {
logger.error("Error storing message: " + e.getMessage(),e);
@@ -437,13 +437,12 @@ public abstract class AbstractMailServer
}
/**
- * Return the camel endpoint uri which should get used for the given mail
+ * Return the camel endpoint uri which should get used
*
- * @param mail
* @return toUri
*
*/
- protected abstract String getToUri(Mail mail);
+ protected abstract String getToUri();
@@ -465,7 +464,6 @@ public abstract class AbstractMailServer
private final class InjectionRouteBuilder extends RouteBuilder {
- private final static String SLIP ="JAMES_TO_SLIP";
@Override
public void configure() throws Exception {
Processor disposeProcessor = new DisposeProcessor();
@@ -479,13 +477,7 @@ public abstract class AbstractMailServer
// dispose the mail object if route processing was complete
.onCompletion().process(disposeProcessor).end()
- .transacted().pipeline().beanRef("mailClaimCheck").process(new Processor() {
-
- public void process(Exchange ex) throws Exception {
-
- ex.getIn().setHeader(SLIP, getToUri(ex.getIn().getBody(Mail.class)));
- }
- }).routingSlip(SLIP);
+ .transacted().pipeline().beanRef("mailClaimCheck").to(getToUri());
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/ActiveMQMailServer.java Wed Sep 15 10:42:58 2010
@@ -21,8 +21,6 @@
package org.apache.james;
-import org.apache.mailet.Mail;
-
/**
* MailServer implementation which use ActiveMQ to spool mails
*
@@ -31,7 +29,7 @@ import org.apache.mailet.Mail;
public class ActiveMQMailServer extends AbstractMailServer{
@Override
- protected String getToUri(Mail mail) {
- return "activemq:queue:processor."+ mail.getState();
+ protected String getToUri() {
+ return "activemq:queue:spool";
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/JMSMailServer.java Wed Sep 15 10:42:58 2010
@@ -21,10 +21,8 @@
package org.apache.james;
-import org.apache.mailet.Mail;
-
/**
- * MailServer implementation which use JMS to spool mails#
+ * MailServer implementation which use JMS to spool mails
*
* If you use ActiveMQ for JMS you should use {@link ActiveMQMailServer}
*
@@ -33,7 +31,7 @@ import org.apache.mailet.Mail;
public class JMSMailServer extends AbstractMailServer{
@Override
- protected String getToUri(Mail mail) {
- return "jms:queue:processor."+ mail.getState();
+ protected String getToUri() {
+ return "jms:queue:spool";
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/AbstractProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -90,6 +90,10 @@ public abstract class AbstractProcessorR
Processor terminatingMailetProcessor = new MailetProcessor(new TerminatingMailet(), logger);
Processor disposeProcessor = new DisposeProcessor();
+
+ from(getFromUri()).recipientList().method(ProcessorRecipientList.class);
+
+
List<HierarchicalConfiguration> processorConfs = config.configurationsAt("processor");
for (int i = 0; i < processorConfs.size(); i++) {
final HierarchicalConfiguration processorConf = processorConfs.get(i);
@@ -101,7 +105,7 @@ public abstract class AbstractProcessorR
matchers.put(processorName, new ArrayList<Matcher>());
// Check which route we need to go
- ChoiceDefinition processorDef = fromF(getFromUri(processorName))
+ ChoiceDefinition processorDef = from("direct:processor." + processorName)
// exchange mode is inOnly
.inOnly()
@@ -224,8 +228,8 @@ public abstract class AbstractProcessorR
.when(new MailStateEquals(Mail.GHOST)).process(disposeProcessor).stop()
// check if the state of the mail is the same as the
- // current processor. If not just route it to the right endpoint via recipientList and stop processing.
- .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(getRecipientList()).stop()
+ // current processor. If not just route it to the spool again
+ .when(new MailStateNotEquals(processorName)).beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class).stop()
// end first choice
.end()
@@ -258,8 +262,8 @@ public abstract class AbstractProcessorR
// end the choice
.end()
- // route it to the right processor
- .beanRef("mailClaimCheck").recipientList().method(getRecipientList());
+ // route it to the spool again
+ .beanRef("mailClaimCheck").recipientList().method(ProcessorRecipientList.class);
}
}
@@ -404,17 +408,11 @@ public abstract class AbstractProcessorR
}
/**
- * Return the uri for the processor to use for consuming mails
+ * Return the uri for consuming mail
*
- * @param processor
* @return consumerUri
*/
- protected abstract String getFromUri(String processor);
+ protected abstract String getFromUri();
- /**
- * Return the class which get used for dynamic lookup the ToUris for the mails (producers)
- *
- * @return recipientListClass
- */
- protected abstract Class<?> getRecipientList();
+
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ActiveMQProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -33,13 +33,12 @@ public class ActiveMQProcessorRouteBuild
}
@Override
- protected String getFromUri(String processorName) {
- return "activemq:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers;
- }
-
- @Override
- protected Class<?> getRecipientList() {
- return ActiveMQRecipientList.class;
+ protected String getFromUri() {
+ return "activemq:queue:spool?" + getOptions();
}
+
+ protected String getOptions() {
+ return "maxConcurrentConsumers=" + maxConcurrentConsumers;
+ }
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JMSProcessorRouteBuilder.java Wed Sep 15 10:42:58 2010
@@ -23,22 +23,11 @@ package org.apache.james.transport.camel
*
* If you want to use ActiveMQ as JMS implementation you should use {@link ActiveMQProcessorRouteBuilder}
*/
-public class JMSProcessorRouteBuilder extends AbstractProcessorRouteBuilder{
+public class JMSProcessorRouteBuilder extends ActiveMQProcessorRouteBuilder{
- private int maxConcurrentConsumers = 20;
- public void setMaxConcurrentConsumers(int maxConcurrentConsumers) {
- this.maxConcurrentConsumers = maxConcurrentConsumers;
- }
-
- @Override
- protected String getFromUri(String processorName) {
- return "jms:queue:processor." + processorName+"?maxConcurrentConsumers=" + maxConcurrentConsumers;
+ @Override
+ protected String getFromUri() {
+ return "jms:queue:spool ?" + getOptions();
}
-
- @Override
- protected Class<?> getRecipientList() {
- return JMSRecipientList.class;
- }
-
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/JamesCamelConstants.java Wed Sep 15 10:42:58 2010
@@ -36,4 +36,5 @@ public interface JamesCamelConstants {
public final static String JAMES_RETRY_DELIVERY = "JAMES_RETRY_DELIVERY";
+ public final static String JAMES_MAIL_STATE = "JAMES_MAIL_STATE";
}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java?rev=997272&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/camel/ProcessorRecipientList.java Wed Sep 15 10:42:58 2010
@@ -0,0 +1,29 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.transport.camel;
+
+import org.apache.camel.Body;
+import org.apache.mailet.Mail;
+
+public final class ProcessorRecipientList {
+
+ public String to(@Body Mail mail) {
+ return "direct:processor." + mail.getState();
+ }
+}
Modified: james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml?rev=997272&r1=997271&r2=997272&view=diff
==============================================================================
--- james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml (original)
+++ james/server/trunk/spring-deployment/src/main/config/james/spring-beans.xml Wed Sep 15 10:42:58 2010
@@ -124,11 +124,12 @@
<camel:routeBuilder ref="processorRoute" />
</camel:camelContext>
+
<bean id="pollingjms" class="org.apache.james.transport.camel.JMSSelectorPollingComponent"/>
<!-- Build the camelroute from the spoolmanager.xml using ActiveMQ as producer and consumer-->
<bean id="spoolmanager" name="processorRoute" class="org.apache.james.transport.camel.ActiveMQProcessorRouteBuilder">
- <property name="maxConcurrentConsumers" value="200"/>
+ <property name="maxConcurrentConsumers" value="500"/>
</bean>
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org