You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/11/04 12:11:53 UTC

svn commit: r1030883 - in /james/server/trunk: queue-activemq/src/main/java/org/apache/james/queue/activemq/ queue-jms/src/main/java/org/apache/james/queue/jms/

Author: norman
Date: Thu Nov  4 11:11:53 2010
New Revision: 1030883

URL: http://svn.apache.org/viewvc?rev=1030883&view=rev
Log:
Switch from BytesMessages to ObjectMessage to avoid the copy of the whole bytearray when using SharedByteInputStream. Also make it configurable to use blobmessages or objectmessage via activemq

Added:
    james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java
Removed:
    james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/BytesMessageInputStream.java
    james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/BytesMessageOutputStream.java
Modified:
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
    james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Thu Nov  4 11:11:53 2010
@@ -25,13 +25,13 @@ import java.net.URL;
 import java.util.Iterator;
 import java.util.Map;
 
-import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
@@ -61,7 +61,8 @@ import org.springframework.jms.connectio
  * primitives, then the toString() method is called on the attribute value to
  * convert it
  * 
- * The implementation use {@link BlobMessage} 
+ * The implementation use {@link BlobMessage} or {@link ObjectMessage}, depending on the constructor which was used
+ * 
  * 
  * See http://activemq.apache.org/blob-messages.html for more details
  * 
@@ -79,26 +80,32 @@ import org.springframework.jms.connectio
  * 
  */
 public class ActiveMQMailQueue extends JMSMailQueue implements ActiveMQSupport{
+    
+    private boolean useBlob;
 
-
     /**
-     * Construct a new ActiveMQ based {@link MailQueue}. The messageTreshold is
-     * used to calculate if a {@link BytesMessage} or a {@link BlobMessage}
-     * should be used when queuing the mail in ActiveMQ. A {@link BlobMessage}
-     * is used If the message size is bigger then the messageTreshold. The size
-     * if in bytes.
+     * Construct a {@link ActiveMQMailQueue} which only use {@link BlobMessage}
+     * 
+     * @see #ActiveMQMailQueue(ConnectionFactory, String, boolean, Log)
+     */
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, final Log logger) {
+        this(connectionFactory, queuename, true, logger);
+    }
+    
+    /**
+     * Construct a new ActiveMQ based {@link MailQueue}.
      * 
      * 
-     * For enabling the priority feature in AMQ see:
      * 
-     * http://activemq.apache.org/how-can-i-support-priority-queues.html
      * 
      * @param connectionFactory
      * @param queuename
+     * @param useBlob
      * @param logger
      */
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, final Log logger) {
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, boolean useBlob, final Log logger) {
         super(connectionFactory, queuename, logger);
+        this.useBlob = useBlob;
     }
     
     /*
@@ -163,7 +170,7 @@ public class ActiveMQMailQueue extends J
      * .jms.Message, org.apache.mailet.Mail)
      */
     @SuppressWarnings("unchecked")
-    protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException {
+    protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException, JMSException {
         if (message instanceof BlobMessage) {
             try {
                 BlobMessage blobMessage = (BlobMessage) message;
@@ -204,50 +211,53 @@ public class ActiveMQMailQueue extends J
     protected void produceMail(Session session, Map<String,Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
         MessageProducer producer = null;
         try {
-            
-            BlobMessage blobMessage = null;
-            MimeMessage mm = mail.getMessage();
-            MimeMessage wrapper = mm;
-            
-            ActiveMQSession amqSession = getAMQSession(session);
-            
-            if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
-                wrapper = ((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
-            }
-            
-            if (wrapper instanceof MimeMessageWrapper) {
-                URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
-                String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
-                MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
-
-                if (blobUrl != null && fromQueue != null && fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
-                    // the message content was not changed so don't need to upload it again and can just point to the url
-                    blobMessage = amqSession.createBlobMessage(blobUrl);
+            // check if we should use a blob message here
+            if (useBlob) { 
+                MimeMessage mm = mail.getMessage();
+                MimeMessage wrapper = mm;
+                
+                ActiveMQSession amqSession = getAMQSession(session);
+                
+                if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
+                    wrapper = ((MimeMessageCopyOnWriteProxy)mm).getWrappedMessage();
+                }
+                BlobMessage blobMessage = null;
+                if (wrapper instanceof MimeMessageWrapper) {
+                    URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
+                    String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
+                    MimeMessageWrapper mwrapper = (MimeMessageWrapper) wrapper;
+
+                    if (blobUrl != null && fromQueue != null && fromQueue.equals(queuename) && mwrapper.isModified() == false ) {
+                        // the message content was not changed so don't need to upload it again and can just point to the url
+                        blobMessage = amqSession.createBlobMessage(blobUrl);
                     
-                    // thats important so we don't delete the blob file after complete the processing!
-                    mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
+                        // thats important so we don't delete the blob file after complete the processing!
+                        mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
                     
-                }
+                    }
 
-            }
-            if (blobMessage == null) {
-                // just use the MimeMessageInputStream which can read every MimeMessage implementation
-                blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
-            }
+                }
+                if (blobMessage == null) {
+                    // just use the MimeMessageInputStream which can read every MimeMessage implementation
+                    blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
+                }
             
-            // store the queue name in the props
-            props.put(JAMES_QUEUE_NAME, queuename);
+                // store the queue name in the props
+                props.put(JAMES_QUEUE_NAME, queuename);
 
               
-            Queue queue = session.createQueue(queuename);
+                Queue queue = session.createQueue(queuename);
 
-            producer = session.createProducer(queue);
-            Iterator<String> keys = props.keySet().iterator();
-            while (keys.hasNext()) {
-                String key = keys.next();
-                blobMessage.setObjectProperty(key, props.get(key));
+                producer = session.createProducer(queue);
+                Iterator<String> keys = props.keySet().iterator();
+                while (keys.hasNext()) {
+                    String key = keys.next();
+                    blobMessage.setObjectProperty(key, props.get(key));
+                }
+                producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+            } else {
+                super.produceMail(session, props, msgPrio, mail);
             }
-            producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
         } finally {
 
             try {

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java Thu Nov  4 11:11:53 2010
@@ -31,14 +31,17 @@ import org.apache.james.queue.jms.JMSMai
  * 
  *
  */
-public class ActiveMQMailQueueFactory extends JMSMailQueueFactory{
-  
+public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
+
+    private boolean useBlob = true;
     
+    public void setUseBlobMessages(boolean useBlob){
+        this.useBlob = useBlob;
+    }
     
-
     @Override
-	protected MailQueue createMailQueue(String name) {
-        return new ActiveMQMailQueue(connectionFactory, name, log);
-	}
+    protected MailQueue createMailQueue(String name) {
+        return new ActiveMQMailQueue(connectionFactory, name, useBlob, log);
+    }
 
 }

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java Thu Nov  4 11:11:53 2010
@@ -74,7 +74,8 @@ public class FileSystemBlobStrategy impl
         }
         out.flush();
         out.close();
-        return f.toURL();
+        // File.toURL() is deprecated
+        return f.toURI().toURL();
     }
 
     /*

Modified: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java?rev=1030883&r1=1030882&r2=1030883&view=diff
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java (original)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/JMSMailQueue.java Thu Nov  4 11:11:53 2010
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.jms;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -36,6 +37,7 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
@@ -45,7 +47,6 @@ import javax.mail.internet.MimeMessage;
 import org.apache.commons.logging.Log;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
-import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.queue.api.MailPrioritySupport;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.mailet.Mail;
@@ -56,6 +57,8 @@ import org.apache.mailet.MailAddress;
  * {@link MailQueue}. This implementation should work with every JMS 1.1.0
  * implementation
  * 
+ * It use {@link ObjectMessage} with a byte array as payload to store the {@link Mail} objects.
+ * 
  * 
  */
 public class JMSMailQueue implements MailQueue, JMSSupport, MailPrioritySupport {
@@ -77,7 +80,6 @@ public class JMSMailQueue implements Mai
      * 
      * Many JMS implementations support better solutions for this, so this should get overridden by these implementations
      * 
-     * @see org.apache.james.queue.api.MailQueue#deQueue(org.apache.james.queue.api.api.MailQueue.DequeueOperation)
      */
     public MailQueueItem deQueue() throws MailQueueException {
         Connection connection = null;
@@ -245,7 +247,7 @@ public class JMSMailQueue implements Mai
             Queue queue = session.createQueue(queuename);
 
             producer = session.createProducer(queue);
-            BytesMessage message = session.createBytesMessage();
+            ObjectMessage message = session.createObjectMessage();
 
             Iterator<String> keys = props.keySet().iterator();
             while(keys.hasNext()) {
@@ -253,7 +255,19 @@ public class JMSMailQueue implements Mai
                 message.setObjectProperty(key, props.get(key));
             }
             
-            mail.getMessage().writeTo(new BytesMessageOutputStream(message));
+            long size = mail.getMessageSize();
+            ByteArrayOutputStream out;
+            if (size > -1) {
+                out = new ByteArrayOutputStream((int)size);
+            } else {
+                out = new ByteArrayOutputStream();
+            }
+            mail.getMessage().writeTo(out);
+            
+            // store the byte array in a ObjectMessage so we can use a SharedByteArrayInputStream later
+            // without the need of copy the day 
+            message.setObject(out.toByteArray());
+
             producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
             
         } finally {
@@ -360,9 +374,9 @@ public class JMSMailQueue implements Mai
      * @param mail
      * @throws MessagingException
      */
-    protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException {
-        if (message instanceof BytesMessage) {
-            mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new BytesMessageInputStream((BytesMessage) message))));
+    protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException, JMSException {
+        if (message instanceof ObjectMessage) {
+            mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageObjectMessageSource((ObjectMessage)message)));
         } else {
             throw new MailQueueException("Not supported JMS Message received " + message);
         }

Added: james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java?rev=1030883&view=auto
==============================================================================
--- james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java (added)
+++ james/server/trunk/queue-jms/src/main/java/org/apache/james/queue/jms/MimeMessageObjectMessageSource.java Thu Nov  4 11:11:53 2010
@@ -0,0 +1,97 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue.jms;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+import javax.mail.util.SharedByteArrayInputStream;
+
+import org.apache.james.core.MimeMessageSource;
+import org.apache.james.lifecycle.Disposable;
+import org.apache.james.lifecycle.LifecycleUtil;
+
+/**
+ * {@link MimeMessageSource} implementation which reads the data from the payload of an {@link ObjectMessage}. 
+ * Its important that the payload is a byte array otherwise it will throw an {@link ClassCastException}
+ *
+ */
+public class MimeMessageObjectMessageSource extends MimeMessageSource implements Disposable{
+
+    private final ObjectMessage message;
+    private final SharedByteArrayInputStream in;
+    private final String id;
+    private byte[] content;
+
+    public MimeMessageObjectMessageSource(ObjectMessage message) throws JMSException {
+        this.message = message;
+        this.id = message.getJMSMessageID();
+        this.content = (byte[]) message.getObject();
+        in = new SharedByteArrayInputStream(content);
+    }
+
+    @Override
+    public long getMessageSize() throws IOException {
+        return content.length;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.core.MimeMessageSource#getInputStream()
+     */
+    public InputStream getInputStream() throws IOException {
+        return in.newStream(0, -1);
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.core.MimeMessageSource#getSourceId()
+     */
+    public String getSourceId() {
+        return id;
+    }
+
+    /*
+     * (non-Javadoc)
+     * @see org.apache.james.lifecycle.Disposable#dispose()
+     */
+    public void dispose() {
+        try {
+            in.close();
+        } catch (IOException e1) {
+            // ignore on dispose
+        }
+        LifecycleUtil.dispose(in);
+
+        try {
+            message.clearBody();
+        } catch (JMSException e) {
+            // ignore on dispose
+        }
+        try {
+            message.clearProperties();
+        } catch (JMSException e) {
+            // ignore on dispose
+        }
+        content = null;
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org