You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/12 19:50:45 UTC

svn commit: r996349 - in /james/server/trunk: ./ spoolmanager/ spoolmanager/src/main/java/org/apache/james/transport/mailets/

Author: norman
Date: Sun Sep 12 17:50:45 2010
New Revision: 996349

URL: http://svn.apache.org/viewvc?rev=996349&view=rev
Log:
Upgrade to activemq-5.4.0 and use the scheduling feature for RemoteDelivery retry. This should improve performance a lot (JAMES-1043)

Modified:
    james/server/trunk/pom.xml
    james/server/trunk/spoolmanager/pom.xml
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java

Modified: james/server/trunk/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/pom.xml?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/pom.xml (original)
+++ james/server/trunk/pom.xml Sun Sep 12 17:50:45 2010
@@ -1048,7 +1048,7 @@
     <productName>Apache-James Mail Server</productName>
     <derby.version>10.5.3.0_1</derby.version>
     <camel.version>2.4.0</camel.version>
-    <activemq.version>5.3.2</activemq.version>
+    <activemq.version>5.4.0</activemq.version>
     <spring.version>3.0.2.RELEASE</spring.version>
     <imap.version>0.1-M2-SNAPSHOT</imap.version>
     <protocols.version>1.2-SNAPSHOT</protocols.version>

Modified: james/server/trunk/spoolmanager/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/pom.xml?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/pom.xml (original)
+++ james/server/trunk/spoolmanager/pom.xml Sun Sep 12 17:50:45 2010
@@ -68,7 +68,11 @@
       <groupId>org.apache.james</groupId>
       <artifactId>apache-james-mailbox-api</artifactId>
     </dependency>
-  
+    <dependency>
+      <groupId>org.apache.activemq</groupId>
+      <artifactId>activemq-core</artifactId>
+    </dependency>
+    
     <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging-api</artifactId>

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/AbstractRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -36,7 +36,6 @@ import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Locale;
 import java.util.Properties;
-import java.util.Random;
 import java.util.StringTokenizer;
 
 import javax.annotation.Resource;
@@ -62,7 +61,6 @@ import org.apache.james.api.dnsservice.D
 import org.apache.james.api.dnsservice.TemporaryResolutionException;
 import org.apache.james.core.MailImpl;
 import org.apache.james.services.MailServer;
-import org.apache.james.transport.camel.DisposeProcessor;
 import org.apache.james.transport.camel.JamesCamelConstants;
 import org.apache.james.util.TimeConverter;
 import org.apache.mailet.HostAddress;
@@ -153,7 +151,7 @@ public abstract class AbstractRemoteDeli
     /** If false then ANY address errors will cause the transmission to fail */
     private boolean sendPartial = false;
 
-    private String outgoingQueueInjectorEndpoint = "direct:outgoingQueueInjectorEndpoint" + instance++;
+    protected String outgoingQueueInjectorEndpoint = "direct:outgoingQueueInjectorEndpoint" + instance++;
     
     /**
      * The amount of time JavaMail will wait before giving up on a socket
@@ -197,9 +195,7 @@ public abstract class AbstractRemoteDeli
 
     private CamelContext context;
 
-    private String outgoingQueue;
-
-    private String outgoingRetryQueue;
+    protected String outgoingQueue;
 
     @Resource(name = "producerTemplate")
     public void setProducerTemplate(ProducerTemplate producerTemplate) {
@@ -292,11 +288,6 @@ public abstract class AbstractRemoteDeli
             outgoingQueue = "outgoing";
         }
 
-        outgoingRetryQueue = getInitParameter("outgoingRetryQueue");
-        if (outgoingRetryQueue == null) {
-            outgoingRetryQueue = "outgoing.retry";
-        }
-
         try {
             if (getInitParameter("timeout") != null) {
                 smtpTimeout = Integer.parseInt(getInitParameter("timeout"));
@@ -367,12 +358,13 @@ public abstract class AbstractRemoteDeli
         }
 
         try {
-            getCamelContext().addRoutes(new RemoteDeliveryRouteBuilder());
+            getCamelContext().addRoutes(createRouteBuilder());
         } catch (Exception e) {
             throw new MessagingException("Unable to add camel route");
         }
     }
 
+    protected abstract RouteBuilder createRouteBuilder();
     /**
      * Calculates Total no. of attempts for the specified delayList.
      * 
@@ -1545,47 +1537,4 @@ public abstract class AbstractRemoteDeli
         this.context = context;
     }
 
-    /**
-     * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
-     * Process.
-     * 
-     * 
-     * 
-     */
-    private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
-        private Processor disposeProcessor = new DisposeProcessor();
-
-        @Override
-        public void configure() throws Exception {
-            
-            // we need to store the message to offsite storage so use claimcheck
-            from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint(outgoingQueue));
-            
-            from(getOutgoingQueueEndpoint(outgoingQueue)).inOnly().transacted()
-            .beanRef("mailEnricher")
-            .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).to(getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).otherwise().beanRef("mailClaimCheck").process(disposeProcessor).stop().end();
-
-            fromF("pollingjms:queue?delay=30000&consumer.endpointUri=%s", getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).inOnly().transacted()
-            .beanRef("mailEnricher")
-            .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").toF(getOutgoingRetryQueueEndpoint(outgoingRetryQueue)).otherwise().process(disposeProcessor).stop().end();
-        }
-
-    }
-
-    /**
-     * Return the Endpoint for the outgoing queue with the given name
-     * 
-     * @param outgoingQueue
-     * @return endpointUri
-     */
-    protected abstract String getOutgoingQueueEndpoint(String outgoingQueue);
-
-    /**
-     * Return the Endpoint for the outgoing retry queue with the given name
-     * 
-     * @param outgoingRetryQueue
-     * @return endpointUri
-     */
-    protected abstract String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue);
-
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/ActiveMQRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -18,6 +18,14 @@
  ****************************************************************/
 package org.apache.james.transport.mailets;
 
+import org.apache.activemq.ScheduledMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
+
 /**
  * RemoteDelivery implementation which use ActiveMQ for the outgoing spooling /
  * queue
@@ -26,15 +34,52 @@ package org.apache.james.transport.maile
  */
 public class ActiveMQRemoteDelivery extends AbstractRemoteDelivery {
 
-    @Override
-    protected String getOutgoingQueueEndpoint(String outgoingQueue) {
+    /**
+     * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
+     * Process.
+     * 
+     * 
+     * 
+     */
+    private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
+        private final Processor disposeProcessor = new DisposeProcessor();
+        private final Processor headerProcessor = new ActiveMQHeaderProcessor();
+        @Override
+        public void configure() throws Exception {
+            
+            // we need to store the message to offsite storage so use claimcheck
+            from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint());
+            
+            from(getOutgoingQueueEndpoint()).inOnly().transacted()
+            .beanRef("mailEnricher")
+            .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).process(headerProcessor).beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+        }
+
+    }
+    
+    private final class ActiveMQHeaderProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            Message in = exchange.getIn();
+            long nextDeliver = (Long) in.getHeader(JamesCamelConstants.JAMES_NEXT_DELIVERY);
+            long delay = nextDeliver - System.currentTimeMillis();
+            if (delay < 0) {
+                delay = 0;
+            }
+            in.setHeader(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
+            
+        }
+        
+    }
+
+    private String getOutgoingQueueEndpoint() {
         return "activemq:queue:" + outgoingQueue;
     }
 
-    @Override
-    protected String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue) {
-        return "activemq:queue:" + outgoingRetryQueue;
 
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RemoteDeliveryRouteBuilder();
     }
 
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java?rev=996349&r1=996348&r2=996349&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/transport/mailets/JMSRemoteDelivery.java Sun Sep 12 17:50:45 2010
@@ -18,6 +18,13 @@
  ****************************************************************/
 package org.apache.james.transport.mailets;
 
+import javax.mail.MessagingException;
+
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.james.transport.camel.DisposeProcessor;
+import org.apache.james.transport.camel.JamesCamelConstants;
+
 /**
  * RemoteDelivery implementation which use JMS for the outgoing spooling /
  * queue.
@@ -28,15 +35,63 @@ package org.apache.james.transport.maile
  */
 public class JMSRemoteDelivery extends AbstractRemoteDelivery {
 
+    private String outgoingRetryQueue;
+
+
     @Override
-    protected String getOutgoingQueueEndpoint(String outgoingQueue) {
+    public void init() throws MessagingException {
+        super.init();
+
+        outgoingRetryQueue = getInitParameter("outgoingRetryQueue");
+        if (outgoingRetryQueue == null) {
+            outgoingRetryQueue = "outgoing.retry";
+        }
+
+    }
+
+
+    /**
+     * RouteBuilder which builds the Camel Route for the whole RemoteDelivery
+     * Process.
+     * 
+     * 
+     * 
+     */
+    private final class RemoteDeliveryRouteBuilder extends RouteBuilder {
+        private Processor disposeProcessor = new DisposeProcessor();
+
+        @Override
+        public void configure() throws Exception {
+            
+            // we need to store the message to offsite storage so use claimcheck
+            from(outgoingQueueInjectorEndpoint).inOnly().beanRef("mailClaimCheck").to(getOutgoingQueueEndpoint());
+            
+            from(getOutgoingQueueEndpoint()).inOnly().transacted()
+            .beanRef("mailEnricher")
+            .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").to(getOutgoingRetryQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+
+            fromF("pollingjms:queue?delay=30000&consumer.endpointUri=%s", getOutgoingRetryQueueEndpoint()).inOnly().transacted()
+            .beanRef("mailEnricher")
+            .process(new DeliveryProcessor()).choice().when(header(JamesCamelConstants.JAMES_RETRY_DELIVERY).isNotNull()).beanRef("mailClaimCheck").toF(getOutgoingRetryQueueEndpoint()).otherwise().process(disposeProcessor).stop().end();
+        }
+
+    }
+
+
+    private String getOutgoingQueueEndpoint() {
         return "jms:queue:" + outgoingQueue;
     }
 
-    @Override
-    protected String getOutgoingRetryQueueEndpoint(String outgoingRetryQueue) {
+    
+    private String getOutgoingRetryQueueEndpoint() {
         return "jms:queue:" + outgoingRetryQueue;
 
     }
 
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RemoteDeliveryRouteBuilder();
+    }
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org