You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/19 14:50:00 UTC

svn commit: r998647 - in /james/server/trunk: ./ spoolmanager/src/main/java/org/apache/james/queue/

Author: norman
Date: Sun Sep 19 12:50:00 2010
New Revision: 998647

URL: http://svn.apache.org/viewvc?rev=998647&view=rev
Log:
Use BytesMessage for queue to eliminate unnecessary byte copy

Added:
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java
Modified:
    james/server/trunk/pom.xml
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
    james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java

Modified: james/server/trunk/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/pom.xml?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/pom.xml (original)
+++ james/server/trunk/pom.xml Sun Sep 19 12:50:00 2010
@@ -993,7 +993,6 @@
       <groupId>org.apache.activemq</groupId>
       <artifactId>activemq-pool</artifactId>
       <version>${activemq.version}</version>
-      <scope>runtime</scope>
     </dependency>
     
     <dependency>

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java Sun Sep 19 12:50:00 2010
@@ -18,8 +18,6 @@
  ****************************************************************/
 package org.apache.james.queue;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -29,18 +27,19 @@ import java.util.List;
 import java.util.StringTokenizer;
 import java.util.concurrent.TimeUnit;
 
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.mail.MessagingException;
 import javax.mail.internet.AddressException;
 
+import org.apache.activemq.BlobMessage;
 import org.apache.james.core.MailImpl;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStreamSource;
@@ -56,12 +55,15 @@ import org.apache.mailet.MailAddress;
  * When a {@link Mail} attribute is found and is not one of the supported primitives, then the 
  * toString() method is called on the attribute value to convert it 
  * 
+ * TODO: Make it possible to use {@link BlobMessage} for large messages
+ * 
  *
  */
 public class ActiveMQMailQueue implements MailQueue {
 
     private final String queuename;
     private final ConnectionFactory connectionFactory;
+    private long messageTreshold = -1;
 
     private final static String JAMES_MAIL_RECIPIENTS = "JAMES_MAIL_RECIPIENTS";
     private final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
@@ -76,11 +78,13 @@ public class ActiveMQMailQueue implement
     private final static String JAMES_MAIL_ATTRIBUTE_NAMES = "JAMES_MAIL_ATTRIBUTE_NAMES";
     private final static int NO_DELAY = -1;
     
-    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename) {
+    public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, long messageTreshold) {
         this.connectionFactory = connectionFactory;     
         this.queuename = queuename;
+        this.messageTreshold  = messageTreshold;
     }
     
+    
     /*
      * (non-Javadoc)
      * @see org.apache.james.queue.MailQueue#deQueue()
@@ -96,7 +100,7 @@ public class ActiveMQMailQueue implement
             Queue queue = session.createQueue(queuename);
             MessageConsumer consumer = session.createConsumer(queue);
             
-            Mail mail = createMail((ObjectMessage)consumer.receive());
+            Mail mail = createMail((BytesMessage)consumer.receive());
             operation.process(mail);
             session.commit();
         } catch (JMSException e) {
@@ -142,7 +146,6 @@ public class ActiveMQMailQueue implement
         Session session = null;
         try {
 
-            
             connection = connectionFactory.createConnection();
             connection.start();
             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -176,6 +179,7 @@ public class ActiveMQMailQueue implement
       
     }
 
+    
     /*
      * (non-Javadoc)
      * @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
@@ -190,8 +194,27 @@ public class ActiveMQMailQueue implement
         return "MailQueue:" + queuename;
     }
     
-    private Mail createMail(ObjectMessage message) throws MailQueueException, JMSException {
+    private Mail createMail(BytesMessage message) throws MailQueueException, JMSException {
         MailImpl mail = new MailImpl();
+        populateMail(message, mail);
+        
+        try {
+            
+            mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new BytesMessageInputStream(message))));
+        } catch (MessagingException e) {
+            throw new MailQueueException("Unable to prepare Mail for dequeue", e);
+        }
+        return mail; 
+    }
+    
+    /**
+     * Populate Mail with values from Message. This exclude the Mail message
+     * 
+     * @param message
+     * @param mail
+     * @throws JMSException
+     */
+    private void populateMail(Message message, MailImpl mail) throws JMSException {
 
         mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE));
         mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
@@ -235,74 +258,15 @@ public class ActiveMQMailQueue implement
         }
         
         mail.setState(message.getStringProperty(JAMES_MAIL_STATE));
-            
-        try {
-            mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new ByteArrayInputStream((byte[])message.getObject()))));
-        } catch (MessagingException e) {
-            throw new MailQueueException("Unable to prepare Mail for dequeue", e);
-        }
-        return mail; 
+
     }
-    
-    @SuppressWarnings("unchecked")
     private Message createMessage(Session session, Mail mail, long delayInMillis) throws MailQueueException{
         try {
-            ObjectMessage message  = session.createObjectMessage();
+            BytesMessage message  = session.createBytesMessage();
      
-            
-            if (delayInMillis > 0) {
-                // This will get picked up by activemq for delay message
-                message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
-            }
-            
-            message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
-            message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
-            message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
-            message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
-            
-            StringBuilder recipientsBuilder = new StringBuilder();
-            
-            Iterator<MailAddress> recipients = mail.getRecipients().iterator();
-            while (recipients.hasNext()) {
-                String recipient = recipients.next().toString();
-                recipientsBuilder.append(recipient.trim());
-                if (recipients.hasNext()) {
-                    recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
-                }
-            }
-            message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
-            message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
-            message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
-            
-            String sender;
-            MailAddress s = mail.getSender();
-            if (s == null) {
-                sender = "";
-            } else {
-                sender = mail.getSender().toString();
-            }
-            
-            StringBuilder attrsBuilder = new StringBuilder();
-            Iterator<String> attrs = mail.getAttributeNames();
-            while (attrs.hasNext()) {
-                String attrName = attrs.next();
-                attrsBuilder.append(attrName);
-                
-                Object value = convertAttributeValue(mail.getAttribute(attrName));
-                message.setObjectProperty(attrName, value);
-                
-                if (attrs.hasNext()) {
-                    attrsBuilder.append(JAMES_MAIL_SEPERATOR);
-                }
-            }
-            message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
-            message.setStringProperty(JAMES_MAIL_SENDER, sender);
-            message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
-            
-            
-            ByteArrayOutputStream messageStream = new ByteArrayOutputStream();
-            mail.getMessage().writeTo(messageStream);
-            message.setObject(messageStream.toByteArray());
+            populateJMSProperties(message, mail, delayInMillis);
+                        
+            mail.getMessage().writeTo(new BytesMessageOutputStream(message));;
             return message;
 
         } catch (MessagingException e) {
@@ -314,6 +278,68 @@ public class ActiveMQMailQueue implement
         }
     }
     
+    
+    /**
+     * Populate JMS Message properties with values 
+     * 
+     * @param message
+     * @param mail
+     * @param delayInMillis
+     * @throws JMSException
+     * @throws MessagingException
+     */
+    @SuppressWarnings("unchecked")
+    private void populateJMSProperties(Message message, Mail mail, long delayInMillis) throws JMSException, MessagingException {
+        if (delayInMillis > 0) {
+            // This will get picked up by activemq for delay message
+            message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
+        }
+        
+        message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
+        message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
+        message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
+        message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
+        
+        StringBuilder recipientsBuilder = new StringBuilder();
+        
+        Iterator<MailAddress> recipients = mail.getRecipients().iterator();
+        while (recipients.hasNext()) {
+            String recipient = recipients.next().toString();
+            recipientsBuilder.append(recipient.trim());
+            if (recipients.hasNext()) {
+                recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
+            }
+        }
+        message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
+        message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
+        message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
+        
+        String sender;
+        MailAddress s = mail.getSender();
+        if (s == null) {
+            sender = "";
+        } else {
+            sender = mail.getSender().toString();
+        }
+        
+        StringBuilder attrsBuilder = new StringBuilder();
+        Iterator<String> attrs = mail.getAttributeNames();
+        while (attrs.hasNext()) {
+            String attrName = attrs.next();
+            attrsBuilder.append(attrName);
+            
+            Object value = convertAttributeValue(mail.getAttribute(attrName));
+            message.setObjectProperty(attrName, value);
+            
+            if (attrs.hasNext()) {
+                attrsBuilder.append(JAMES_MAIL_SEPERATOR);
+            }
+        }
+        message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
+        message.setStringProperty(JAMES_MAIL_SENDER, sender);
+        message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
+                   
+    }
     /**
      * Convert the attribute value if necessary. 
      * 
@@ -326,4 +352,5 @@ public class ActiveMQMailQueue implement
         }
         return value.toString();
     }
+    
 }

Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java Sun Sep 19 12:50:00 2010
@@ -50,7 +50,7 @@ public class ActiveMQMailQueueFactory im
     public synchronized MailQueue getQueue(String name) {
         MailQueue queue = queues.get(name);
         if (queue == null) {
-            queue = new ActiveMQMailQueue(connectionFactory, name);
+            queue = new ActiveMQMailQueue(connectionFactory, name, -1);
             queues.put(name, queue);
         }
 

Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java?rev=998647&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java Sun Sep 19 12:50:00 2010
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+/**
+ * Provide an {@link InputStream} around a {@link BytesMessage}
+ * 
+ *
+ */
+public class BytesMessageInputStream extends InputStream {
+
+    private BytesMessage message;
+    public BytesMessageInputStream(BytesMessage message) {
+        this.message = message;
+        
+    }
+    
+
+
+    @Override
+    public boolean markSupported() {
+        return false;
+    }
+
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        int r = 0;
+        for (int i = 0; i < len; i++) {
+            int a = read();
+            if (a == -1) {
+                if (i == 0) {
+                    return -1;
+                } else {
+                    break;
+                }
+            }
+            
+            r += a;
+            b[off + i] = (byte) a;
+        }
+        return r;
+    }
+
+
+    @Override
+    public int read(byte[] b) throws IOException {
+        try {
+            int i =  message.readBytes(b);
+            
+            return i;
+        } catch (JMSException e) {
+            throw new IOException("Unable to read from message", e);
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        try {
+            int i =  message.readByte();
+           
+            return i;
+        } catch (MessageEOFException e) {
+            return -1;
+        } catch (JMSException e) {
+            throw new IOException("Unable to read from message", e);
+        }
+    }
+    
+    /**
+     * Return the underlying {@link BytesMessage}
+     * 
+     * @return message
+     */
+    public BytesMessage getMessage() {
+        return message;
+    }
+    
+}

Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java?rev=998647&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java Sun Sep 19 12:50:00 2010
@@ -0,0 +1,77 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.queue;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+/**
+ * Provide an {@link OutputStream} over an {@link BytesMessage}
+ * 
+ *
+ */
+public class BytesMessageOutputStream extends OutputStream {
+
+    private BytesMessage message;
+   
+    public BytesMessageOutputStream(BytesMessage message) {
+        this.message = message;
+    }
+    
+    
+    @Override
+    public void write(int b) throws IOException {
+        try {
+            message.writeInt(b);
+        } catch (JMSException e) {
+            throw new IOException("Unable to write to message", e);
+        }
+    }
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        try {
+            message.writeBytes(b, off, len);
+        } catch (JMSException e) {
+            throw new IOException("Unable to write to message", e);
+        }
+    }
+    @Override
+    public void write(byte[] b) throws IOException {
+        try {
+            message.writeBytes(b);
+        } catch (JMSException e) {
+            throw new IOException("Unable to write to message", e);
+        }
+    }
+    
+    /**
+     * Return the underlying {@link BytesMessage}
+     * 
+     * @return message
+     */
+    public BytesMessage getMessage() {
+        return message;
+    }
+    
+}
+
+



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org