You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/09/19 14:50:00 UTC
svn commit: r998647 - in /james/server/trunk: ./
spoolmanager/src/main/java/org/apache/james/queue/
Author: norman
Date: Sun Sep 19 12:50:00 2010
New Revision: 998647
URL: http://svn.apache.org/viewvc?rev=998647&view=rev
Log:
Use BytesMessage for queue to eliminate unnecessary byte copy
Added:
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java
Modified:
james/server/trunk/pom.xml
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
Modified: james/server/trunk/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/pom.xml?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/pom.xml (original)
+++ james/server/trunk/pom.xml Sun Sep 19 12:50:00 2010
@@ -993,7 +993,6 @@
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq.version}</version>
- <scope>runtime</scope>
</dependency>
<dependency>
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueue.java Sun Sep 19 12:50:00 2010
@@ -18,8 +18,6 @@
****************************************************************/
package org.apache.james.queue;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -29,18 +27,19 @@ import java.util.List;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;
+import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
+import org.apache.activemq.BlobMessage;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStreamSource;
@@ -56,12 +55,15 @@ import org.apache.mailet.MailAddress;
* When a {@link Mail} attribute is found and is not one of the supported primitives, then the
* toString() method is called on the attribute value to convert it
*
+ * TODO: Make it possible to use {@link BlobMessage} for large messages
+ *
*
*/
public class ActiveMQMailQueue implements MailQueue {
private final String queuename;
private final ConnectionFactory connectionFactory;
+ private long messageTreshold = -1;
private final static String JAMES_MAIL_RECIPIENTS = "JAMES_MAIL_RECIPIENTS";
private final static String JAMES_MAIL_SENDER = "JAMES_MAIL_SENDER";
@@ -76,11 +78,13 @@ public class ActiveMQMailQueue implement
private final static String JAMES_MAIL_ATTRIBUTE_NAMES = "JAMES_MAIL_ATTRIBUTE_NAMES";
private final static int NO_DELAY = -1;
- public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename) {
+ public ActiveMQMailQueue(final ConnectionFactory connectionFactory, final String queuename, long messageTreshold) {
this.connectionFactory = connectionFactory;
this.queuename = queuename;
+ this.messageTreshold = messageTreshold;
}
+
/*
* (non-Javadoc)
* @see org.apache.james.queue.MailQueue#deQueue()
@@ -96,7 +100,7 @@ public class ActiveMQMailQueue implement
Queue queue = session.createQueue(queuename);
MessageConsumer consumer = session.createConsumer(queue);
- Mail mail = createMail((ObjectMessage)consumer.receive());
+ Mail mail = createMail((BytesMessage)consumer.receive());
operation.process(mail);
session.commit();
} catch (JMSException e) {
@@ -142,7 +146,6 @@ public class ActiveMQMailQueue implement
Session session = null;
try {
-
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -176,6 +179,7 @@ public class ActiveMQMailQueue implement
}
+
/*
* (non-Javadoc)
* @see org.apache.james.queue.MailQueue#enQueue(org.apache.mailet.Mail)
@@ -190,8 +194,27 @@ public class ActiveMQMailQueue implement
return "MailQueue:" + queuename;
}
- private Mail createMail(ObjectMessage message) throws MailQueueException, JMSException {
+ private Mail createMail(BytesMessage message) throws MailQueueException, JMSException {
MailImpl mail = new MailImpl();
+ populateMail(message, mail);
+
+ try {
+
+ mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new BytesMessageInputStream(message))));
+ } catch (MessagingException e) {
+ throw new MailQueueException("Unable to prepare Mail for dequeue", e);
+ }
+ return mail;
+ }
+
+ /**
+ * Populate Mail with values from Message. This exclude the Mail message
+ *
+ * @param message
+ * @param mail
+ * @throws JMSException
+ */
+ private void populateMail(Message message, MailImpl mail) throws JMSException {
mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE));
mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
@@ -235,74 +258,15 @@ public class ActiveMQMailQueue implement
}
mail.setState(message.getStringProperty(JAMES_MAIL_STATE));
-
- try {
- mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageInputStreamSource(mail.getName(), new ByteArrayInputStream((byte[])message.getObject()))));
- } catch (MessagingException e) {
- throw new MailQueueException("Unable to prepare Mail for dequeue", e);
- }
- return mail;
+
}
-
- @SuppressWarnings("unchecked")
private Message createMessage(Session session, Mail mail, long delayInMillis) throws MailQueueException{
try {
- ObjectMessage message = session.createObjectMessage();
+ BytesMessage message = session.createBytesMessage();
-
- if (delayInMillis > 0) {
- // This will get picked up by activemq for delay message
- message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
- }
-
- message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
- message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
- message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
- message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
-
- StringBuilder recipientsBuilder = new StringBuilder();
-
- Iterator<MailAddress> recipients = mail.getRecipients().iterator();
- while (recipients.hasNext()) {
- String recipient = recipients.next().toString();
- recipientsBuilder.append(recipient.trim());
- if (recipients.hasNext()) {
- recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
- }
- }
- message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
- message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
- message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
-
- String sender;
- MailAddress s = mail.getSender();
- if (s == null) {
- sender = "";
- } else {
- sender = mail.getSender().toString();
- }
-
- StringBuilder attrsBuilder = new StringBuilder();
- Iterator<String> attrs = mail.getAttributeNames();
- while (attrs.hasNext()) {
- String attrName = attrs.next();
- attrsBuilder.append(attrName);
-
- Object value = convertAttributeValue(mail.getAttribute(attrName));
- message.setObjectProperty(attrName, value);
-
- if (attrs.hasNext()) {
- attrsBuilder.append(JAMES_MAIL_SEPERATOR);
- }
- }
- message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
- message.setStringProperty(JAMES_MAIL_SENDER, sender);
- message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
-
-
- ByteArrayOutputStream messageStream = new ByteArrayOutputStream();
- mail.getMessage().writeTo(messageStream);
- message.setObject(messageStream.toByteArray());
+ populateJMSProperties(message, mail, delayInMillis);
+
+ mail.getMessage().writeTo(new BytesMessageOutputStream(message));;
return message;
} catch (MessagingException e) {
@@ -314,6 +278,68 @@ public class ActiveMQMailQueue implement
}
}
+
+ /**
+ * Populate JMS Message properties with values
+ *
+ * @param message
+ * @param mail
+ * @param delayInMillis
+ * @throws JMSException
+ * @throws MessagingException
+ */
+ @SuppressWarnings("unchecked")
+ private void populateJMSProperties(Message message, Mail mail, long delayInMillis) throws JMSException, MessagingException {
+ if (delayInMillis > 0) {
+ // This will get picked up by activemq for delay message
+ message.setLongProperty(org.apache.activemq.ScheduledMessage.AMQ_SCHEDULED_DELAY, delayInMillis);
+ }
+
+ message.setStringProperty(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
+ message.setLongProperty(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
+ message.setLongProperty(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
+ message.setStringProperty(JAMES_MAIL_NAME, mail.getName());
+
+ StringBuilder recipientsBuilder = new StringBuilder();
+
+ Iterator<MailAddress> recipients = mail.getRecipients().iterator();
+ while (recipients.hasNext()) {
+ String recipient = recipients.next().toString();
+ recipientsBuilder.append(recipient.trim());
+ if (recipients.hasNext()) {
+ recipientsBuilder.append(JAMES_MAIL_SEPERATOR);
+ }
+ }
+ message.setStringProperty(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
+ message.setStringProperty(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
+ message.setStringProperty(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());
+
+ String sender;
+ MailAddress s = mail.getSender();
+ if (s == null) {
+ sender = "";
+ } else {
+ sender = mail.getSender().toString();
+ }
+
+ StringBuilder attrsBuilder = new StringBuilder();
+ Iterator<String> attrs = mail.getAttributeNames();
+ while (attrs.hasNext()) {
+ String attrName = attrs.next();
+ attrsBuilder.append(attrName);
+
+ Object value = convertAttributeValue(mail.getAttribute(attrName));
+ message.setObjectProperty(attrName, value);
+
+ if (attrs.hasNext()) {
+ attrsBuilder.append(JAMES_MAIL_SEPERATOR);
+ }
+ }
+ message.setStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
+ message.setStringProperty(JAMES_MAIL_SENDER, sender);
+ message.setStringProperty(JAMES_MAIL_STATE, mail.getState());
+
+ }
/**
* Convert the attribute value if necessary.
*
@@ -326,4 +352,5 @@ public class ActiveMQMailQueue implement
}
return value.toString();
}
+
}
Modified: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java?rev=998647&r1=998646&r2=998647&view=diff
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java (original)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/ActiveMQMailQueueFactory.java Sun Sep 19 12:50:00 2010
@@ -50,7 +50,7 @@ public class ActiveMQMailQueueFactory im
public synchronized MailQueue getQueue(String name) {
MailQueue queue = queues.get(name);
if (queue == null) {
- queue = new ActiveMQMailQueue(connectionFactory, name);
+ queue = new ActiveMQMailQueue(connectionFactory, name, -1);
queues.put(name, queue);
}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java?rev=998647&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageInputStream.java Sun Sep 19 12:50:00 2010
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+
+/**
+ * Provide an {@link InputStream} around a {@link BytesMessage}
+ *
+ *
+ */
+public class BytesMessageInputStream extends InputStream {
+
+ private BytesMessage message;
+ public BytesMessageInputStream(BytesMessage message) {
+ this.message = message;
+
+ }
+
+
+
+ @Override
+ public boolean markSupported() {
+ return false;
+ }
+
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ int r = 0;
+ for (int i = 0; i < len; i++) {
+ int a = read();
+ if (a == -1) {
+ if (i == 0) {
+ return -1;
+ } else {
+ break;
+ }
+ }
+
+ r += a;
+ b[off + i] = (byte) a;
+ }
+ return r;
+ }
+
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ int i = message.readBytes(b);
+
+ return i;
+ } catch (JMSException e) {
+ throw new IOException("Unable to read from message", e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ int i = message.readByte();
+
+ return i;
+ } catch (MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException("Unable to read from message", e);
+ }
+ }
+
+ /**
+ * Return the underlying {@link BytesMessage}
+ *
+ * @return message
+ */
+ public BytesMessage getMessage() {
+ return message;
+ }
+
+}
Added: james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java?rev=998647&view=auto
==============================================================================
--- james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java (added)
+++ james/server/trunk/spoolmanager/src/main/java/org/apache/james/queue/BytesMessageOutputStream.java Sun Sep 19 12:50:00 2010
@@ -0,0 +1,77 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.queue;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+
+/**
+ * Provide an {@link OutputStream} over an {@link BytesMessage}
+ *
+ *
+ */
+public class BytesMessageOutputStream extends OutputStream {
+
+ private BytesMessage message;
+
+ public BytesMessageOutputStream(BytesMessage message) {
+ this.message = message;
+ }
+
+
+ @Override
+ public void write(int b) throws IOException {
+ try {
+ message.writeInt(b);
+ } catch (JMSException e) {
+ throw new IOException("Unable to write to message", e);
+ }
+ }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ try {
+ message.writeBytes(b, off, len);
+ } catch (JMSException e) {
+ throw new IOException("Unable to write to message", e);
+ }
+ }
+ @Override
+ public void write(byte[] b) throws IOException {
+ try {
+ message.writeBytes(b);
+ } catch (JMSException e) {
+ throw new IOException("Unable to write to message", e);
+ }
+ }
+
+ /**
+ * Return the underlying {@link BytesMessage}
+ *
+ * @return message
+ */
+ public BytesMessage getMessage() {
+ return message;
+ }
+
+}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org