You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/11/04 12:11:53 UTC
svn commit: r1030883 - in /james/server/trunk:
queue-activemq/src/main/java/org/apache/james/queue/activemq/
queue-jms/src/main/java/org/apache/james/queue/jms/
Author: norman
Date: Thu Nov 4 11:11:53 2010
New Revision: 1030883
URL: http://svn.apache.org/viewvc?rev=1030883&view=rev
Log:
Switch from BytesMessages to ObjectMessage to avoid the copy of the whole bytearray when using SharedByteInputStream. Also make it configurable to use blobmessages or objectmessage via activemq
Added:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java
Removed:
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/BytesMessageInputStream.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/BytesMessageOutputStream.java
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Thu Nov 4 11:11:53 2010
@@ -25,13 +25,13 @@ import java.net.URL;
import java.util.Iterator;
import java.util.Map;
-import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
@@ -61,7 +61,8 @@ import org.springframework.jms.connectio
* primitives, then the toString() method is called on the attribute value to
* convert it
*
- * The implementation use {@link BlobMessage}
+ * The implementation use {@link BlobMessage} or {@link ObjectMessage}, depending on the constructor which was used
+ *
*
* See http://activemq.apache.org/blob-messages.html for more details
*
@@ -79,26 +80,32 @@ import org.springframework.jms.connectio
*
*/
public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
+
+ private boolean useBlob;
-
/**
- * Construct a new ActiveMQ based {@link MailQueue}. The messageTreshold is
- * used to calculate if a {@link BytesMessage} or a {@link BlobMessage}
- * should be used when queuing the mail in ActiveMQ. A {@link BlobMessage}
- * is used If the message size is bigger then the messageTreshold. The size
- * if in bytes.
+ * Construct a {@link ActiveMQMailQueue} which only use {@link BlobMessage}
+ *
+ * @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
+ */
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, final Log logger) {
+ this(connectionFactory, queuename, true, logger);
+ }
+
+ /**
+ * Construct a new ActiveMQ based {@link MailQueue}.
*
*
- * For enabling the priority feature in AMQ see:
*
- * http://activemq.apache.org/how-can-i-support-priority-queues.html
*
* @param connectionFactory
* @param queuename
+ * @param useBlob
* @param logger
*/
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, final Log logger) {
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, boolean useBlob, final Log logger) {
super(connectionFactory, queuename, logger);
+ this.useBlob = useBlob;
}
/*
@@ -163,7 +170,7 @@ public class ActiveMQMailQueue extends J
* .jms.Message, org.apache.mailet.Mail)
*/
@SuppressWarnings("unchecked")
- protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException {
+ protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException, JMSException {
if (message instanceof BlobMessage) {
try {
BlobMessage blobMessage = (BlobMessage) message;
@@ -204,50 +211,53 @@ public class ActiveMQMailQueue extends J
protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
MessageProducer producer = null;
try {
-
- BlobMessage blobMessage = null;
- MimeMessage mm = mail.getMessage();
- MimeMessage wrapper = mm;
-
- ActiveMQSession amqSession = getAMQSession(session);
-
- if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
- wrapper = ((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
- }
-
- if (wrapper instanceof MimeMessageWrapper) {
- URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
- String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
- MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
-
- if (blobUrl != null && fromQueue != null && fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
- // the message content was not changed so don't need to upload it again and can just point to the url
- blobMessage = amqSession.createBlobMessage(blobUrl);
+ // check if we should use a blob message here
+ if (useBlob) {
+ MimeMessage mm = mail.getMessage();
+ MimeMessage wrapper = mm;
+
+ ActiveMQSession amqSession = getAMQSession(session);
+
+ if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
+ wrapper = ((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
+ }
+ BlobMessage blobMessage = null;
+ if (wrapper instanceof MimeMessageWrapper) {
+ URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
+ String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
+ MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
+
+ if (blobUrl != null && fromQueue != null && fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
+ // the message content was not changed so don't need to upload it again and can just point to the url
+ blobMessage = amqSession.createBlobMessage(blobUrl);
- // thats important so we don't delete the blob file after complete the processing!
- mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
+ // thats important so we don't delete the blob file after complete the processing!
+ mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
- }
+ }
- }
- if (blobMessage == null) {
- // just use the MimeMessageInputStream which can read every MimeMessage implementation
- blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
- }
+ }
+ if (blobMessage == null) {
+ // just use the MimeMessageInputStream which can read every MimeMessage implementation
+ blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
+ }
- // store the queue name in the props
- props.put(JAMES_QUEUE_NAME, queuename);
+ // store the queue name in the props
+ props.put(JAMES_QUEUE_NAME, queuename);
- Queue queue = session.createQueue(queuename);
+ Queue queue = session.createQueue(queuename);
- producer = session.createProducer(queue);
- Iterator<String> keys = props.keySet().iterator();
- while (keys.hasNext()) {
- String key = keys.next();
- blobMessage.setObjectProperty(key, props.get(key));
+ producer = session.createProducer(queue);
+ Iterator<String> keys = props.keySet().iterator();
+ while (keys.hasNext()) {
+ String key = keys.next();
+ blobMessage.setObjectProperty(key, props.get(key));
+ }
+ producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+ } else {
+ super.produceMail(session, props, msgPrio, mail);
}
- producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
} finally {
try {
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java Thu Nov 4 11:11:53 2010
@@ -31,14 +31,17 @@ import org.apache.james.queue.jms.JMSMai
*
*
*/
-public class ActiveMQMailQueueFactory extends JMSMailQueueFactory{
-
+public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
+
+ private boolean useBlob = true;
+ public void setUseBlobMessages(boolean useBlob){
+ this.useBlob = useBlob;
+ }
-
@Override
- protected MailQueue createMailQueue(String name) {
- return new ActiveMQMailQueue(connectionFactory, name, log);
- }
+ protected MailQueue createMailQueue(String name) {
+ return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);
+ }
}
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java Thu Nov 4 11:11:53 2010
@@ -74,7 +74,8 @@ public class FileSystemBlobStrategy impl
}
out.flush();
out.close();
- return f.toURL();
+ // File.toURL() is deprecated
+ return f.toURI().toURL();
}
/*
Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Thu Nov 4 11:11:53 2010
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.queue.jms;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -36,6 +37,7 @@ import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
@@ -45,7 +47,6 @@ import javax.mail.internet.MimeMessage;
import org.apache.commons.logging.Log;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
import org.apache.mailet.Mail;
@@ -56,6 +57,8 @@ import org.apache.mailet.MailAddress;
* {@link MailQueue}. This implementation should work with every JMS 1.1.0
* implementation
*
+ * It use {@link ObjectMessage} with a byte array as payload to store the {@link Mail} objects.
+ *
*
*/
public class JMSMailQueue implements MailQueue, JMSSupport, MailPrioritySupport {
@@ -77,7 +80,6 @@ public class JMSMailQueue implements Mai
*
* Many JMS implementations support better solutions for this, so this should get overridden by these implementations
*
- * @see org.apache.james.queue.api.MailQueue#deQueue(org.apache.james.queue.api.api.MailQueue.DequeueOperation)
*/
public MailQueueItem deQueue() throws MailQueueException {
Connection connection = null;
@@ -245,7 +247,7 @@ public class JMSMailQueue implements Mai
Queue queue = session.createQueue(queuename);
producer = session.createProducer(queue);
- BytesMessage message = session.createBytesMessage();
+ ObjectMessage message = session.createObjectMessage();
Iterator<String> keys = props.keySet().iterator();
while(keys.hasNext()) {
@@ -253,7 +255,19 @@ public class JMSMailQueue implements Mai
message.setObjectProperty(key, props.get(key));
}
- mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+ long size = mail.getMessageSize();
+ ByteArrayOutputStream out;
+ if (size > -1) {
+ out = new ByteArrayOutputStream((int)size);
+ } else {
+ out = new ByteArrayOutputStream();
+ }
+ mail.getMessage().writeTo(out);
+
+ // store the byte array in a ObjectMessage so we can use a SharedByteArrayInputStream later
+ // without the need of copy the day
+ message.setObject(out.toByteArray());
+
producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
} finally {
@@ -360,9 +374,9 @@ public class JMSMailQueue implements Mai
* @param mail
* @throws MessagingException
*/
- protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException {
- if (message instanceof BytesMessage) {
- mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new BytesMessageInputStream((BytesMessage) message))));
+ protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException, JMSException {
+ if (message instanceof ObjectMessage) {
+ mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageObjectMessageSource((ObjectMessage)message)));
} else {
throw new MailQueueException("Not supported JMS Message received " + message);
}
Added: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java?rev=1030883&view=auto
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java (added)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java Thu Nov 4 11:11:53 2010
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.lifecycle.Disposable;
+import org.apache.james.lifecycle.LifecycleUtil;
+
+/**
+ * {@link MimeMessageSource} implementation which reads the data from the payload of an {@link ObjectMessage}.
+ * Its important that the payload is a byte array otherwise it will throw an {@link ClassCastException}
+ *
+ */
+public class MimeMessageObjectMessageSource extends MimeMessageSource implements Disposable{
+
+ private final ObjectMessage message;
+ private final SharedByteArrayInputStream in;
+ private final String id;
+ private byte[] content;
+
+ public MimeMessageObjectMessageSource(ObjectMessage message) throws JMSException {
+ this.message = message;
+ this.id = message.getJMSMessageID();
+ this.content = (byte[]) message.getObject();
+ in = new SharedByteArrayInputStream(content);
+ }
+
+ @Override
+ public long getMessageSize() throws IOException {
+ return content.length;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.core.MimeMessageSource#getInputStream()
+ */
+ public InputStream getInputStream() throws IOException {
+ return in.newStream(0, -1);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.core.MimeMessageSource#getSourceId()
+ */
+ public String getSourceId() {
+ return id;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.lifecycle.Disposable#dispose()
+ */
+ public void dispose() {
+ try {
+ in.close();
+ } catch (IOException e1) {
+ // ignore on dispose
+ }
+ LifecycleUtil.dispose(in);
+
+ try {
+ message.clearBody();
+ } catch (JMSException e) {
+ // ignore on dispose
+ }
+ try {
+ message.clearProperties();
+ } catch (JMSException e) {
+ // ignore on dispose
+ }
+ content = null;
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org