You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/10/18 21:56:20 UTC
svn commit: r1023968 - in /james/server/trunk:
queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Author: norman
Date: Mon Oct 18 19:56:20 2010
New Revision: 1023968
URL: http://svn.apache.org/viewvc?rev=1023968&view=rev
Log:
Just some refactoring to make it easier to extend JMSMailQueue
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Mon Oct 18 19:56:20 2010
@@ -20,6 +20,8 @@ package org.apache.james.queue.activemq;
import java.io.IOException;
import java.net.MalformedURLException;
+import java.util.Iterator;
+import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
@@ -27,6 +29,8 @@ import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@@ -152,8 +156,7 @@ public class ActiveMQMailQueue extends J
* org.apache.james.queue.jms.JMSMailQueue#createMessage(javax.jms.Session,
* org.apache.mailet.Mail, long)
*/
- protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, IOException, MessagingException {
-
+ protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
boolean useBlob = false;
if (messageTreshold != -1) {
try {
@@ -166,16 +169,36 @@ public class ActiveMQMailQueue extends J
}
}
if (useBlob) {
- ActiveMQSession amqSession;
- if (session instanceof PooledSession) {
- amqSession = ((PooledSession) session).getInternalSession();
- } else {
- amqSession = (ActiveMQSession) session;
+ MessageProducer producer = null;
+ try {
+ ActiveMQSession amqSession;
+ if (session instanceof PooledSession) {
+ amqSession = ((PooledSession) session).getInternalSession();
+ } else {
+ amqSession = (ActiveMQSession) session;
+ }
+ BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage()));
+ Queue queue = session.createQueue(queuename);
+
+ producer = session.createProducer(queue);
+ Iterator<String> keys = props.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ message.setObjectProperty(key, props.get(key));
+ }
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+ } finally {
+
+ try {
+ if (producer != null)
+ producer.close();
+ } catch (JMSException e) {
+ // ignore here
+ }
}
- BlobMessage message = amqSession.createBlobMessage(new MimeMessageInputStream(mail.getMessage()));
- return message;
+
} else {
- return super.createMessage(session, mail, delayInMillis);
+ super.produceMail(session, props, msgPrio, mail);
}
}
Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1023968&r1=1023967&r2=1023968&view=diff
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Mon Oct 18 19:56:20 2010
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
@@ -199,7 +201,6 @@ public class JMSMailQueue implements Mai
public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException {
Connection connection = null;
Session session = null;
- MessageProducer producer = null;
long mydelay = 0;
@@ -212,18 +213,17 @@ public class JMSMailQueue implements Mai
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue queue = session.createQueue(queuename);
- producer = session.createProducer(queue);
- Message message = createMessage(session, mail, mydelay);
- populateJMSProperties(message, mail, mydelay);
-
+
int msgPrio = NORMAL_PRIORITY;
Object prio = mail.getAttribute(MAIL_PRIORITY);
if (prio instanceof Integer) {
msgPrio = (Integer) prio;
}
-
- producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+
+ Map<String, Object> props = getJMSProperties(mail, mydelay);
+
+ produceMail(session, props, msgPrio, mail);
+
} catch (Exception e) {
if (session != null) {
try {
@@ -235,13 +235,6 @@ public class JMSMailQueue implements Mai
throw new MailQueueException("Unable to enqueue mail " + mail, e);
} finally {
-
- try {
- if (producer != null)
- producer.close();
- } catch (JMSException e) {
- // ignore here
- }
try {
if (session != null)
session.close();
@@ -268,25 +261,42 @@ public class JMSMailQueue implements Mai
}
/**
- * Create Message which holds the {@link MimeMessage} of the given Mail
- *
- * @param session
- * @param mail
- * @return jmsMessage
- * @throws JMSException
- * @throws IOException
- * @throws MessagingException
- * @throws IOException
+ * Produce the mail to the JMS Queue
*/
- protected Message createMessage(Session session, Mail mail, long delayInMillis) throws JMSException, MessagingException, IOException {
- BytesMessage message = session.createBytesMessage();
- mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+ protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
+ MessageProducer producer = null;
+
+ try {
+ Queue queue = session.createQueue(queuename);
+
+ producer = session.createProducer(queue);
+ BytesMessage message = session.createBytesMessage();
+
+ Iterator<String> keys = props.keySet().iterator();
+ while(keys.hasNext()) {
+ String key = keys.next();
+ message.setObjectProperty(key, props.get(key));
+ }
+
+ mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+ producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+
+ } finally {
+
+ try {
+ if (producer != null)
+ producer.close();
+ } catch (JMSException e) {
+ // ignore here
+ }
+ }
+
+
- return message;
}
/**
- * Populate JMS Message properties with values
+ * Get JMS Message properties with values
*
* @param message
* @param mail
@@ -295,17 +305,18 @@ public class JMSMailQueue implements Mai
* @throws MessagingException
*/
@SuppressWarnings("unchecked")
- protected void populateJMSProperties(Message message, Mail mail, long delayInMillis) throws JMSException, MessagingException {
+ protected Map<String,Object> getJMSProperties(Mail mail, long delayInMillis) throws JMSException, MessagingException {
+ Map<String, Object> props = new HashMap<String, Object>();
long nextDelivery = -1;
if (delayInMillis > 0) {
nextDelivery = System.currentTimeMillis() + delayInMillis;
}
- message.setLongProperty(JAMES_NEXT_DELIVERY, nextDelivery);
- message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
- message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
- message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
- message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
+ props.put(JAMES_NEXT_DELIVERY, nextDelivery);
+ props.put(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
+ props.put(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
+ props.put(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
+ props.put(JAMES_MAIL_NAME, mail.getName());
StringBuilder recipientsBuilder = new StringBuilder();
@@ -317,9 +328,9 @@ public class JMSMailQueue implements Mai
recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
}
}
- message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
- message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
- message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
+ props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
+ props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
+ props.put(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
String sender;
MailAddress s = mail.getSender();
@@ -336,16 +347,16 @@ public class JMSMailQueue implements Mai
attrsBuilder.append(attrName);
Object value = convertAttributeValue(mail.getAttribute(attrName));
- message.setObjectProperty(attrName, value);
+ props.put(attrName, value);
if (attrs.hasNext()) {
attrsBuilder.append(JAMES_MAIL_SEPERATOR);
}
}
- message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
- message.setStringProperty(JAMES_MAIL_SENDER, sender);
- message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
-
+ props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
+ props.put(JAMES_MAIL_SENDER, sender);
+ props.put(JAMES_MAIL_STATE, mail.getState());
+ return props;
}
/**
@@ -392,7 +403,6 @@ public class JMSMailQueue implements Mai
* @throws JMSException
*/
protected void populateMail(Message message, MailImpl mail) throws JMSException {
-
mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE));
mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
mail.setName(message.getStringProperty(JAMES_MAIL_NAME));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org