You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2011/05/11 15:37:05 UTC
svn commit: r1101872 - in
/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq:
ActiveMQMailQueue.java FileSystemBlobStrategy.java
MimeMessageBlobMessageSource.java
Author: norman
Date: Wed May 11 13:37:04 2011
New Revision: 1101872
URL: http://svn.apache.org/viewvc?rev=1101872&view=rev
Log:
Fix a race-condition which could lead to problems when a mail content is shared because it is queued multiple times. See JAMES-1240
Modified:
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Wed May 11 13:37:04 2011
@@ -19,9 +19,7 @@
package org.apache.james.queue.activemq;
import java.io.IOException;
-import java.io.InputStream;
import java.net.MalformedURLException;
-import java.net.URL;
import java.util.List;
import java.util.Map;
@@ -38,7 +36,6 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
-import javax.mail.internet.SharedInputStream;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
@@ -46,9 +43,7 @@ import org.apache.activemq.command.Activ
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.core.MimeMessageInputStream;
-import org.apache.james.core.MimeMessageInputStreamSource;
import org.apache.james.core.MimeMessageSource;
-import org.apache.james.core.MimeMessageWrapper;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.jms.JMSMailQueue;
import org.apache.mailet.Mail;
@@ -137,23 +132,9 @@ public class ActiveMQMailQueue extends J
// Ignore on error
logger.debug("Unable to get url from blobmessage for mail " + mail.getName());
}
- InputStream in = blobMessage.getInputStream();
- MimeMessageSource source;
-
- // if its a SharedInputStream we can make use of some more
- // performant implementation which don't need to copy the
- // message to a temporary file
- if (in instanceof SharedInputStream) {
- String sourceId = message.getJMSMessageID();
- long size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
- source = new MimeMessageBlobMessageSource((SharedInputStream) in, size, sourceId);
- } else {
- source = new MimeMessageInputStreamSource(mail.getName(), in);
- }
-
+ MimeMessageSource source = new MimeMessageBlobMessageSource(blobMessage);
mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
- } catch (IOException e) {
- throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e);
+
} catch (JMSException e) {
throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e);
}
@@ -162,6 +143,7 @@ public class ActiveMQMailQueue extends J
}
}
+
/*
* (non-Javadoc)
*
@@ -182,10 +164,21 @@ public class ActiveMQMailQueue extends J
MimeMessage wrapper = mm;
ActiveMQSession amqSession = getAMQSession(session);
-
+
+ /*
+ * Remove this optimization as it could lead to problems when the same blob content
+ * is shared across different messages.
+ *
+ * I still think it would be a good idea to somehow do this but at the moment it's just
+ * safer to disable it.
+ *
+ * TODO: Re-Enable it again once it works!
+ *
+ * See JAMES-1240
if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
wrapper = ((MimeMessageCopyOnWriteProxy) mm).getWrappedMessage();
}
+
if (wrapper instanceof MimeMessageWrapper) {
URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
@@ -195,21 +188,17 @@ public class ActiveMQMailQueue extends J
// the message content was not changed so don't need to
// upload it again and can just point to the url
blobMessage = amqSession.createBlobMessage(blobUrl);
-
- // thats important so we don't delete the blob file
- // after complete the processing!
- mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
reuse = true;
-
}
- }
+ }*/
if (blobMessage == null) {
// just use the MimeMessageInputStream which can read every
// MimeMessage implementation
blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
}
-
+
+
// store the queue name in the props
props.put(JAMES_QUEUE_NAME, queuename);
@@ -220,10 +209,14 @@ public class ActiveMQMailQueue extends J
blobMessage.setObjectProperty(entry.getKey(), entry.getValue());
}
producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+
+
+
} else {
super.produceMail(session, props, msgPrio, mail);
}
} catch (JMSException e) {
+ e.printStackTrace();
if (!reuse && blobMessage != null && blobMessage instanceof ActiveMQBlobMessage) {
((ActiveMQBlobMessage) blobMessage).deleteFile();
}
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java Wed May 11 13:37:04 2011
@@ -25,10 +25,8 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
-import java.util.concurrent.ConcurrentHashMap;
import javax.jms.JMSException;
-import javax.mail.util.SharedFileInputStream;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.blob.BlobDownloadStrategy;
@@ -46,7 +44,6 @@ public class FileSystemBlobStrategy impl
private final FileSystem fs;
private final BlobTransferPolicy policy;
- private final ConcurrentHashMap<String, SharedFileInputStream> map = new ConcurrentHashMap<String, SharedFileInputStream>();
private int splitCount;
public FileSystemBlobStrategy(final BlobTransferPolicy policy, final FileSystem fs, int splitCount) {
@@ -115,13 +112,6 @@ public class FileSystemBlobStrategy impl
*/
public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
File f = getFile(message);
- SharedFileInputStream in = map.remove(f.getCanonicalPath());
- try {
- if (in != null)
- in.close();
- } catch (IOException e) {
- // ignore here
- }
if (f.exists()) {
if (f.delete() == false) {
throw new IOException("Unable to delete file " + f);
@@ -134,15 +124,7 @@ public class FileSystemBlobStrategy impl
*/
public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
File f = getFile(message);
- String key = f.getCanonicalPath();
- // use exactly one SharedFileInputStream per file so we can keep track
- // of filehandles
- // See JAMES-1122
- SharedFileInputStream in = map.putIfAbsent(key, new SharedFileInputStream(f));
- if (in == null) {
- in = map.get(key);
- }
- return in;
+ return new FileInputStream(f);
}
/**
Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java Wed May 11 13:37:04 2011
@@ -20,28 +20,23 @@ package org.apache.james.queue.activemq;
import java.io.IOException;
import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-import javax.mail.internet.SharedInputStream;
+import javax.jms.JMSException;
+import org.apache.activemq.BlobMessage;
import org.apache.james.core.MimeMessageSource;
-import org.apache.james.lifecycle.api.Disposable;
/**
*
*/
-public class MimeMessageBlobMessageSource extends MimeMessageSource implements ActiveMQSupport, Disposable {
+public class MimeMessageBlobMessageSource extends MimeMessageSource implements ActiveMQSupport {
- private SharedInputStream in;
private String sourceId;
- private long size;
- private List<InputStream> streams = new ArrayList<InputStream>();
+ private BlobMessage message;
- public MimeMessageBlobMessageSource(SharedInputStream in, long size, String sourceId) {
- this.in = in;
- this.size = size;
- this.sourceId = sourceId;
+ public MimeMessageBlobMessageSource(BlobMessage message) throws JMSException {
+ this.message = message;
+ this.sourceId = message.getJMSMessageID();
}
/*
@@ -49,10 +44,12 @@ public class MimeMessageBlobMessageSourc
*
* @see org.apache.james.core.MimeMessageSource#getInputStream()
*/
- public synchronized InputStream getInputStream() throws IOException {
- InputStream sin = in.newStream(0, -1);
- streams.add(sin);
- return sin;
+ public InputStream getInputStream() throws IOException {
+ try {
+ return message.getInputStream();
+ } catch (JMSException e) {
+ throw new IOException("Unable to open stream", e);
+ }
}
/*
@@ -66,34 +63,18 @@ public class MimeMessageBlobMessageSourc
@Override
public long getMessageSize() throws IOException {
- // if the size is < 1 we seems to not had it stored in the property, so
- // fallback to super implementation
- if (size == -1) {
- super.getMessageSize();
- }
- return size;
- }
-
- /**
- * Call dispose on the {@link InputStream}
- */
- public synchronized void dispose() {
-
try {
- ((InputStream) in).close();
- } catch (IOException e) {
- // ignore on dispose
- }
- in = null;
- for (int i = 0; i < streams.size(); i++) {
- InputStream s = streams.get(i);
- try {
- s.close();
- } catch (IOException e) {
- // ignore on dispose
+ long size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+
+ // if the size is < 1 we seems to not had it stored in the property, so
+ // fallback to super implementation
+ if (size == -1) {
+ super.getMessageSize();
}
- s = null;
+ return size;
+ } catch (JMSException e) {
+ throw new IOException("Unable to get message size", e);
}
- streams.clear();
+
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org