You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/10/18 21:56:20 UTC

svn commit: r1023968 - in /james/server/trunk: queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java

Author: norman
Date: Mon Oct 18 19:56:20 2010
New Revision: 1023968

URL: http://svn.apache.org/viewvc?rev=1023968&view=rev
Log:
Just some refactoring to make it easier to extend JMSMailQueue

Modified:
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Mon Oct 18 19:56:20 2010
@@ -20,6 +20,8 @@ package org.apache.james.queue.activemq;
 
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.util.Iterator;
+import java.util.Map;
 
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
@@ -27,6 +29,8 @@ import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
@@ -152,8 +156,7 @@ public class ActiveMQMailQueue extends J
      * org.apache.james.queue.jms.JMSMailQueue#createMessage(javax.jms.Session,
      * org.apache.mailet.Mail, long)
      */
-    protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, IOException, MessagingException {
-
+    protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
         boolean useBlob = false;
         if (messageTreshold != -1) {
             try {
@@ -166,16 +169,36 @@ public class ActiveMQMailQueue extends J
             }
         }
         if (useBlob) {
-            ActiveMQSession amqSession;
-            if (session instanceof PooledSession) {
-                amqSession = ((PooledSession) session).getInternalSession();
-            } else {
-                amqSession = (ActiveMQSession) session;
+            MessageProducer producer = null;
+            try {
+                ActiveMQSession amqSession;
+                if (session instanceof PooledSession) {
+                    amqSession = ((PooledSession) session).getInternalSession();
+                } else {
+                    amqSession = (ActiveMQSession) session;
+                }
+                BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage()));
+                Queue queue = session.createQueue(queuename);
+
+                producer = session.createProducer(queue);
+                Iterator<String> keys = props.keySet().iterator();
+                while (keys.hasNext()) {
+                    String key = keys.next();
+                    message.setObjectProperty(key, props.get(key));
+                }
+                producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+            } finally {
+
+                try {
+                    if (producer != null)
+                        producer.close();
+                } catch (JMSException e) {
+                    // ignore here
+                }
             }
-            BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage()));
-            return message;
+            
         } else {
-            return super.createMessage(session, mail, delayInMillis);
+            super.produceMail(session, props, msgPrio, mail);
         }
 
     }

Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Mon Oct 18 19:56:20 2010
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.StringTokenizer;
 import java.util.concurrent.TimeUnit;
 
@@ -199,7 +201,6 @@ public class JMSMailQueue implements Mai
     public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException {
         Connection connection = null;
         Session session = null;
-        MessageProducer producer = null;
 
         long mydelay = 0;
 
@@ -212,18 +213,17 @@ public class JMSMailQueue implements Mai
             connection = connectionFactory.createConnection();
             connection.start();
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue = session.createQueue(queuename);
-            producer = session.createProducer(queue);
-            Message message = createMessage(session, mail, mydelay);
-            populateJMSProperties(message, mail, mydelay);
-
+            
             int msgPrio = NORMAL_PRIORITY;
             Object prio = mail.getAttribute(MAIL_PRIORITY);
             if (prio instanceof Integer) {
                 msgPrio = (Integer) prio;
             }
-
-            producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+            
+            Map<String, Object> props = getJMSProperties(mail, mydelay);
+            
+            produceMail(session, props, msgPrio, mail);
+            
         } catch (Exception e) {
             if (session != null) {
                 try {
@@ -235,13 +235,6 @@ public class JMSMailQueue implements Mai
             throw new MailQueueException("Unable to enqueue mail " + mail, e);
 
         } finally {
-
-            try {
-                if (producer != null)
-                    producer.close();
-            } catch (JMSException e) {
-                // ignore here
-            }
             try {
                 if (session != null)
                     session.close();
@@ -268,25 +261,42 @@ public class JMSMailQueue implements Mai
     }
 
     /**
-     * Create Message which holds the {@link MimeMessage} of the given Mail
-     * 
-     * @param session
-     * @param mail
-     * @return jmsMessage
-     * @throws JMSException
-     * @throws IOException
-     * @throws MessagingException
-     * @throws IOException
+     * Produce the mail to the JMS Queue
      */
-    protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, MessagingException, IOException {
-        BytesMessage message = session.createBytesMessage();
-        mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+    protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
+        MessageProducer producer = null;
+
+        try {
+            Queue queue = session.createQueue(queuename);
+
+            producer = session.createProducer(queue);
+            BytesMessage message = session.createBytesMessage();
+
+            Iterator<String> keys = props.keySet().iterator();
+            while(keys.hasNext()) {
+                String key = keys.next();
+                message.setObjectProperty(key, props.get(key));
+            }
+            
+            mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+            producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+            
+        } finally {
+
+            try {
+                if (producer != null)
+                    producer.close();
+            } catch (JMSException e) {
+                // ignore here
+            }
+        }
+        
+        
         
-        return message;
     }
 
     /**
-     * Populate JMS Message properties with values
+     * Get JMS Message properties with values
      * 
      * @param message
      * @param mail
@@ -295,17 +305,18 @@ public class JMSMailQueue implements Mai
      * @throws MessagingException
      */
     @SuppressWarnings("unchecked")
-    protected void populateJMSProperties(Message message, Mail mail, long delayInMillis) throws JMSException, MessagingException {
+    protected Map<String,Object> getJMSProperties(Mail mail, long delayInMillis) throws JMSException, MessagingException {
+        Map<String, Object> props = new HashMap<String, Object>();
         long nextDelivery = -1;
         if (delayInMillis > 0) {
             nextDelivery = System.currentTimeMillis() + delayInMillis;
 
         }
-        message.setLongProperty(JAMES_NEXT_DELIVERY, nextDelivery);
-        message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
-        message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
-        message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
-        message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
+        props.put(JAMES_NEXT_DELIVERY, nextDelivery);
+        props.put(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
+        props.put(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
+        props.put(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
+        props.put(JAMES_MAIL_NAME, mail.getName());
 
         StringBuilder recipientsBuilder = new StringBuilder();
 
@@ -317,9 +328,9 @@ public class JMSMailQueue implements Mai
                 recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
             }
         }
-        message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
-        message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
-        message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
+        props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
+        props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
+        props.put(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
 
         String sender;
         MailAddress s = mail.getSender();
@@ -336,16 +347,16 @@ public class JMSMailQueue implements Mai
             attrsBuilder.append(attrName);
 
             Object value = convertAttributeValue(mail.getAttribute(attrName));
-            message.setObjectProperty(attrName, value);
+            props.put(attrName, value);
 
             if (attrs.hasNext()) {
                 attrsBuilder.append(JAMES_MAIL_SEPERATOR);
             }
         }
-        message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
-        message.setStringProperty(JAMES_MAIL_SENDER, sender);
-        message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
-
+        props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
+        props.put(JAMES_MAIL_SENDER, sender);
+        props.put(JAMES_MAIL_STATE, mail.getState());
+        return props;
     }
 
     /**
@@ -392,7 +403,6 @@ public class JMSMailQueue implements Mai
      * @throws JMSException
      */
     protected void populateMail(Message message, MailImpl mail) throws JMSException {
-
         mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE));
         mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
         mail.setName(message.getStringProperty(JAMES_MAIL_NAME));



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org