You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2011/05/11 15:37:05 UTC

svn commit: r1101872 - in /james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq: ActiveMQMailQueue.java FileSystemBlobStrategy.java MimeMessageBlobMessageSource.java

Author: norman
Date: Wed May 11 13:37:04 2011
New Revision: 1101872

URL: http://svn.apache.org/viewvc?rev=1101872&view=rev
Log:
Fix a race-condition which could lead to problems when a mail content is shared because it is queued multiple times. See JAMES-1240

Modified:
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
    james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueue.java Wed May 11 13:37:04 2011
@@ -19,9 +19,7 @@
 package org.apache.james.queue.activemq;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.MalformedURLException;
-import java.net.URL;
 import java.util.List;
 import java.util.Map;
 
@@ -38,7 +36,6 @@ import javax.jms.Session;
 import javax.jms.TemporaryQueue;
 import javax.mail.MessagingException;
 import javax.mail.internet.MimeMessage;
-import javax.mail.internet.SharedInputStream;
 
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.BlobMessage;
@@ -46,9 +43,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.util.JMSExceptionSupport;
 import org.apache.james.core.MimeMessageCopyOnWriteProxy;
 import org.apache.james.core.MimeMessageInputStream;
-import org.apache.james.core.MimeMessageInputStreamSource;
 import org.apache.james.core.MimeMessageSource;
-import org.apache.james.core.MimeMessageWrapper;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.jms.JMSMailQueue;
 import org.apache.mailet.Mail;
@@ -137,23 +132,9 @@ public class ActiveMQMailQueue extends J
                     // Ignore on error
                     logger.debug("Unable to get url from blobmessage for mail " + mail.getName());
                 }
-                InputStream in = blobMessage.getInputStream();
-                MimeMessageSource source;
-
-                // if its a SharedInputStream we can make use of some more
-                // performant implementation which don't need to copy the
-                // message to a temporary file
-                if (in instanceof SharedInputStream) {
-                    String sourceId = message.getJMSMessageID();
-                    long size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
-                    source = new MimeMessageBlobMessageSource((SharedInputStream) in, size, sourceId);
-                } else {
-                    source = new MimeMessageInputStreamSource(mail.getName(), in);
-                }
-
+                MimeMessageSource source = new MimeMessageBlobMessageSource(blobMessage);
                 mail.setMessage(new MimeMessageCopyOnWriteProxy(source));
-            } catch (IOException e) {
-                throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e);
+            
             } catch (JMSException e) {
                 throw new MailQueueException("Unable to populate MimeMessage for mail " + mail.getName(), e);
             }
@@ -162,6 +143,7 @@ public class ActiveMQMailQueue extends J
         }
     }
 
+    
     /*
      * (non-Javadoc)
      * 
@@ -182,10 +164,21 @@ public class ActiveMQMailQueue extends J
                 MimeMessage wrapper = mm;
 
                 ActiveMQSession amqSession = getAMQSession(session);
-
+                
+                /*
+                 * Remove this optimization as it could lead to problems when the same blob content
+                 * is shared across different messages. 
+                 * 
+                 * I still think it would be a good idea to somehow do this but at the moment it's just 
+                 * safer to disable it.
+                 * 
+                 * TODO: Re-Enable it again once it works!
+                 * 
+                 * See JAMES-1240
                 if (wrapper instanceof MimeMessageCopyOnWriteProxy) {
                     wrapper = ((MimeMessageCopyOnWriteProxy) mm).getWrappedMessage();
                 }
+
                 if (wrapper instanceof MimeMessageWrapper) {
                     URL blobUrl = (URL) mail.getAttribute(JAMES_BLOB_URL);
                     String fromQueue = (String) mail.getAttribute(JAMES_QUEUE_NAME);
@@ -195,21 +188,17 @@ public class ActiveMQMailQueue extends J
                         // the message content was not changed so don't need to
                         // upload it again and can just point to the url
                         blobMessage = amqSession.createBlobMessage(blobUrl);
-
-                        // thats important so we don't delete the blob file
-                        // after complete the processing!
-                        mail.setAttribute(JAMES_REUSE_BLOB_URL, true);
                         reuse = true;
-
                     }
 
-                }
+                }*/
                 if (blobMessage == null) {
                     // just use the MimeMessageInputStream which can read every
                     // MimeMessage implementation
                     blobMessage = amqSession.createBlobMessage(new MimeMessageInputStream(wrapper));
                 }
-
+                 
+                    
                 // store the queue name in the props
                 props.put(JAMES_QUEUE_NAME, queuename);
 
@@ -220,10 +209,14 @@ public class ActiveMQMailQueue extends J
                     blobMessage.setObjectProperty(entry.getKey(), entry.getValue());
                 }
                 producer.send(blobMessage, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);
+                    
+              
+
             } else {
                 super.produceMail(session, props, msgPrio, mail);
             }
         } catch (JMSException e) {
+            e.printStackTrace();
             if (!reuse && blobMessage != null && blobMessage instanceof ActiveMQBlobMessage) {
                 ((ActiveMQBlobMessage) blobMessage).deleteFile();
             }

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/FileSystemBlobStrategy.java Wed May 11 13:37:04 2011
@@ -25,10 +25,8 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URL;
-import java.util.concurrent.ConcurrentHashMap;
 
 import javax.jms.JMSException;
-import javax.mail.util.SharedFileInputStream;
 
 import org.apache.activemq.BlobMessage;
 import org.apache.activemq.blob.BlobDownloadStrategy;
@@ -46,7 +44,6 @@ public class FileSystemBlobStrategy impl
 
     private final FileSystem fs;
     private final BlobTransferPolicy policy;
-    private final ConcurrentHashMap<String, SharedFileInputStream> map = new ConcurrentHashMap<String, SharedFileInputStream>();
     private int splitCount;
 
     public FileSystemBlobStrategy(final BlobTransferPolicy policy, final FileSystem fs, int splitCount) {
@@ -115,13 +112,6 @@ public class FileSystemBlobStrategy impl
      */
     public void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException {
         File f = getFile(message);
-        SharedFileInputStream in = map.remove(f.getCanonicalPath());
-        try {
-            if (in != null)
-                in.close();
-        } catch (IOException e) {
-            // ignore here
-        }
         if (f.exists()) {
             if (f.delete() == false) {
                 throw new IOException("Unable to delete file " + f);
@@ -134,15 +124,7 @@ public class FileSystemBlobStrategy impl
      */
     public InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException {
         File f = getFile(message);
-        String key = f.getCanonicalPath();
-        // use exactly one SharedFileInputStream per file so we can keep track
-        // of filehandles
-        // See JAMES-1122
-        SharedFileInputStream in = map.putIfAbsent(key, new SharedFileInputStream(f));
-        if (in == null) {
-            in = map.get(key);
-        }
-        return in;
+        return new FileInputStream(f);
     }
 
     /**

Modified: james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java
URL: http://svn.apache.org/viewvc/james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java?rev=1101872&r1=1101871&r2=1101872&view=diff
==============================================================================
--- james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java (original)
+++ james/server/trunk/queue-activemq/src/main/java/org/apache/james/queue/activemq/MimeMessageBlobMessageSource.java Wed May 11 13:37:04 2011
@@ -20,28 +20,23 @@ package org.apache.james.queue.activemq;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
 
-import javax.mail.internet.SharedInputStream;
+import javax.jms.JMSException;
 
+import org.apache.activemq.BlobMessage;
 import org.apache.james.core.MimeMessageSource;
-import org.apache.james.lifecycle.api.Disposable;
 
 /**
  *
  */
-public class MimeMessageBlobMessageSource extends MimeMessageSource implements ActiveMQSupport, Disposable {
+public class MimeMessageBlobMessageSource extends MimeMessageSource implements ActiveMQSupport {
 
-    private SharedInputStream in;
     private String sourceId;
-    private long size;
-    private List<InputStream> streams = new ArrayList<InputStream>();
+    private BlobMessage message;
 
-    public MimeMessageBlobMessageSource(SharedInputStream in, long size, String sourceId) {
-        this.in = in;
-        this.size = size;
-        this.sourceId = sourceId;
+    public MimeMessageBlobMessageSource(BlobMessage message) throws JMSException {
+        this.message = message;
+        this.sourceId = message.getJMSMessageID();
     }
 
     /*
@@ -49,10 +44,12 @@ public class MimeMessageBlobMessageSourc
      * 
      * @see org.apache.james.core.MimeMessageSource#getInputStream()
      */
-    public synchronized InputStream getInputStream() throws IOException {
-        InputStream sin = in.newStream(0, -1);
-        streams.add(sin);
-        return sin;
+    public InputStream getInputStream() throws IOException {
+        try {
+            return message.getInputStream();
+        } catch (JMSException e) {
+            throw new IOException("Unable to open stream", e);
+        }
     }
 
     /*
@@ -66,34 +63,18 @@ public class MimeMessageBlobMessageSourc
 
     @Override
     public long getMessageSize() throws IOException {
-        // if the size is < 1 we seems to not had it stored in the property, so
-        // fallback to super implementation
-        if (size == -1) {
-            super.getMessageSize();
-        }
-        return size;
-    }
-
-    /**
-     * Call dispose on the {@link InputStream}
-     */
-    public synchronized void dispose() {
-
         try {
-            ((InputStream) in).close();
-        } catch (IOException e) {
-            // ignore on dispose
-        }
-        in = null;
-        for (int i = 0; i < streams.size(); i++) {
-            InputStream s = streams.get(i);
-            try {
-                s.close();
-            } catch (IOException e) {
-                // ignore on dispose
+            long size = message.getLongProperty(JAMES_MAIL_MESSAGE_SIZE);
+
+            // if the size is < 1 we seems to not had it stored in the property, so
+            // fallback to super implementation
+            if (size == -1) {
+                super.getMessageSize();
             }
-            s = null;
+            return size;
+        } catch (JMSException e) {
+            throw new IOException("Unable to get message size", e);
         }
-        streams.clear();
+
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org