You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/12 19:50:45 UTC
svn commit: r996349 - in /james/server/trunk: ./ spoolmanager/
spoolmanager/src/main/java/org/apache/james/transport/mailets/
Author: norman
Date: Sun Sep 12 17:50:45 2010
New Revision: 996349
URL: http://svn.apache.org/viewvc?rev=996349&view=rev
Log:
Upgrade to activemq-5.4.0 and use the scheduling feature for RemoteDelivery retry. This should improve performance a lot (JAMES-1043)
Modified:
james/server/trunk/pom.xml
james/server/trunk/spoolmanager/pom.xml
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java
Modified: james/server/trunk/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/pom.xml?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/pom.xml (original)
+++ james/server/trunk/pom.xml Sun Sep 12 17:50:45 2010
@@ -1048,7 +1048,7 @@
<productName>Apache-James Mail Server</productName>
<derby.version>10.5.3.0_1</derby.version>
<camel.version>2.4.0</camel.version>
- <activemq.version>5.3.2</activemq.version>
+ <activemq.version>5.4.0</activemq.version>
<spring.version>3.0.2.RELEASE</spring.version>
<imap.version>0.1-M2-SNAPSHOT</imap.version>
<protocols.version>1.2-SNAPSHOT</protocols.version>
Modified: james/server/trunk/spoolmanager/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/pom.xml?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/pom.xml (original)
+++ james/server/trunk/spoolmanager/pom.xml Sun Sep 12 17:50:45 2010
@@ -68,7 +68,11 @@
<groupId>org.apache.james</groupId>
<artifactId>apache-james-mailbox-api</artifactId>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ </dependency>
+
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging-api</artifactId>
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -36,7 +36,6 @@ import java.util.Hashtable;
import java.util.Iterator;
import java.util.Locale;
import java.util.Properties;
-import java.util.Random;
import java.util.StringTokenizer;
import javax.annotation.Resource;
@@ -62,7 +61,6 @@ import org.apache.james.api.dnsservice.D
import org.apache.james.api.dnsservice.TemporaryResolutionException;
import org.apache.james.core.MailImpl;
import org.apache.james.services.MailServer;
-import org.apache.james.transport.camel.DisposeProcessor;
import org.apache.james.transport.camel.JamesCamelConstants;
import org.apache.james.util.TimeConverter;
import org.apache.mailet.HostAddress;
@@ -153,7 +151,7 @@ public abstract class AbstractRemoteDeli
/** If false then ANY address errors will cause the transmission to fail */
private boolean sendPartial = false;
- private String outgoingQueueInjectorEndpoint = "direct:outgoingQueueInjectorEndpoint" + instance++;
+ protected String outgoingQueueInjectorEndpoint = "direct:outgoingQueueInjectorEndpoint" + instance++;
/**
* The amount of time JavaMail will wait before giving up on a socket
@@ -197,9 +195,7 @@ public abstract class AbstractRemoteDeli
private CamelContext context;
- private String outgoingQueue;
-
- private String outgoingRetryQueue;
+ protected String outgoingQueue;
@Resource(name = "producerTemplate")
public void setProducerTemplate(ProducerTemplate producerTemplate) {
@@ -292,11 +288,6 @@ public abstract class AbstractRemoteDeli
outgoingQueue = "outgoing";
}
- outgoingRetryQueue = getInitParameter("outgoingRetryQueue");
- if (outgoingRetryQueue == null) {
- outgoingRetryQueue = "outgoing.retry";
- }
-
try {
if (getInitParameter("timeout") != null) {
smtpTimeout = Integer.parseInt(getInitParameter("timeout"));
@@ -367,12 +358,13 @@ public abstract class AbstractRemoteDeli
}
try {
- getCamelContext().addRoutes(new RemoteDeliveryRouteBuilder());
+ getCamelContext().addRoutes(createRouteBuilder());
} catch (Exception e) {
throw new MessagingException("Unable to add camel route");
}
}
+ protected abstract RouteBuilder createRouteBuilder();
/**
* Calculates Total no. of attempts for the specified delayList.
*
@@ -1545,47 +1537,4 @@ public abstract class AbstractRemoteDeli
this.context = context;
}
- /**
- * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
- * Process.
- *
- *
- *
- */
- private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
- private Processor disposeProcessor = new DisposeProcessor();
-
- @Override
- public void configure() throws Exception {
-
- // we need to store the message to offsite storage so use claimcheck
- from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint(outgoingQueue));
-
- from(getOutgoingQueueEndpoint(outgoingQueue)).inOnly().transacted()
- .beanRef("mailEnricher")
- .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).to(getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).otherwise().beanRef("mailClaimCheck").process(disposeProcessor).stop().end();
-
- fromF("pollingjms:queue?delay=30000&consumer.endpointUri=%s", getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).inOnly().transacted()
- .beanRef("mailEnricher")
- .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").toF(getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).otherwise().process(disposeProcessor).stop().end();
- }
-
- }
-
- /**
- * Return the Endpoint for the outgoing queue with the given name
- *
- * @param outgoingQueue
- * @return endpointUri
- */
- protected abstract String getOutgoingQueueEndpoint(String outgoingQueue);
-
- /**
- * Return the Endpoint for the outgoing retry queue with the given name
- *
- * @param outgoingRetryQueue
- * @return endpointUri
- */
- protected abstract String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue);
-
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -18,6 +18,14 @@
****************************************************************/
package org.apache.james.transport.mailets;
+import org.apache.activemq.ScheduledMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
+
/**
* RemoteDelivery implementation which use ActiveMQ for the outgoing spooling /
* queue
@@ -26,15 +34,52 @@ package org.apache.james.transport.maile
*/
public class ActiveMQRemoteDelivery extends AbstractRemoteDelivery {
- @Override
- protected String getOutgoingQueueEndpoint(String outgoingQueue) {
+ /**
+ * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
+ * Process.
+ *
+ *
+ *
+ */
+ private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
+ private final Processor disposeProcessor = new DisposeProcessor();
+ private final Processor headerProcessor = new ActiveMQHeaderProcessor();
+ @Override
+ public void configure() throws Exception {
+
+ // we need to store the message to offsite storage so use claimcheck
+ from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint());
+
+ from(getOutgoingQueueEndpoint()).inOnly().transacted()
+ .beanRef("mailEnricher")
+ .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).process(headerProcessor).beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+ }
+
+ }
+
+ private final class ActiveMQHeaderProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ Message in = exchange.getIn();
+ long nextDeliver = (Long) in.getHeader(JamesCamelConstants.JAMES_NEXT_DELIVERY);
+ long delay = nextDeliver - System.currentTimeMillis();
+ if (delay < 0) {
+ delay = 0;
+ }
+ in.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+
+ }
+
+ }
+
+ private String getOutgoingQueueEndpoint() {
return "activemq:queue:" + outgoingQueue;
}
- @Override
- protected String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue) {
- return "activemq:queue:" + outgoingRetryQueue;
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RemoteDeliveryRouteBuilder();
}
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -18,6 +18,13 @@
****************************************************************/
package org.apache.james.transport.mailets;
+import javax.mail.MessagingException;
+
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
+
/**
* RemoteDelivery implementation which use JMS for the outgoing spooling /
* queue.
@@ -28,15 +35,63 @@ package org.apache.james.transport.maile
*/
public class JMSRemoteDelivery extends AbstractRemoteDelivery {
+ private String outgoingRetryQueue;
+
+
@Override
- protected String getOutgoingQueueEndpoint(String outgoingQueue) {
+ public void init() throws MessagingException {
+ super.init();
+
+ outgoingRetryQueue = getInitParameter("outgoingRetryQueue");
+ if (outgoingRetryQueue == null) {
+ outgoingRetryQueue = "outgoing.retry";
+ }
+
+ }
+
+
+ /**
+ * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
+ * Process.
+ *
+ *
+ *
+ */
+ private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
+ private Processor disposeProcessor = new DisposeProcessor();
+
+ @Override
+ public void configure() throws Exception {
+
+ // we need to store the message to offsite storage so use claimcheck
+ from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint());
+
+ from(getOutgoingQueueEndpoint()).inOnly().transacted()
+ .beanRef("mailEnricher")
+ .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").to(getOutgoingRetryQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+
+ fromF("pollingjms:queue?delay=30000&consumer.endpointUri=%s", getOutgoingRetryQueueEndpoint()).inOnly().transacted()
+ .beanRef("mailEnricher")
+ .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").toF(getOutgoingRetryQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+ }
+
+ }
+
+
+ private String getOutgoingQueueEndpoint() {
return "jms:queue:" + outgoingQueue;
}
- @Override
- protected String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue) {
+
+ private String getOutgoingRetryQueueEndpoint() {
return "jms:queue:" + outgoingRetryQueue;
}
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RemoteDeliveryRouteBuilder();
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org