You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/25 14:14:17 UTC

[james-project] branch 3.7.x updated: JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1464)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/3.7.x by this push:
     new 70c7ad2a5f JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1464)
70c7ad2a5f is described below

commit 70c7ad2a5fbb31bc1f31cf734624d80a171cc265
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sat Feb 25 21:14:12 2023 +0700

    JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1464)
---
 .../queue/activemq/ActiveMQCacheableMailQueue.java | 27 ++++++++++++++++++----
 .../james/queue/jms/JMSCacheableMailQueue.java     |  7 +++---
 2 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index 01596cb926..6660a2a7db 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -18,6 +18,7 @@
  ****************************************************************/
 package org.apache.james.queue.activemq;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.util.List;
@@ -56,6 +57,8 @@ import org.apache.mailet.Mail;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import reactor.core.publisher.Mono;
+
 /**
  * <p>
  * {@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -226,10 +229,26 @@ public class ActiveMQCacheableMailQueue extends JMSCacheableMailQueue implements
     }
 
     @Override
-    protected MailQueueItem createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
-        Mail mail = createMail(message);
-        ActiveMQMailQueueItem activeMQMailQueueItem = new ActiveMQMailQueueItem(mail, session, consumer, message);
-        return mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, queueName);
+    protected Mono<MailQueueItem> createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
+        try {
+            Mail mail = createMail(message);
+            ActiveMQMailQueueItem activeMQMailQueueItem = new ActiveMQMailQueueItem(mail, session, consumer, message);
+            return Mono.just(mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, queueName));
+        } catch (MessagingException e) {
+            if (e.getCause() instanceof FileNotFoundException) {
+                LOGGER.warn("Blob message cannot be found, discarding email", e);
+                try {
+                    session.commit();
+                } catch (JMSException ex) {
+                    throw new MailQueueException("Unable to commit dequeue operation for mail", ex);
+                } finally {
+                    JMSCacheableMailQueue.closeConsumer(consumer);
+                    JMSCacheableMailQueue.closeSession(session);
+                }
+                return Mono.empty();
+            }
+            return Mono.error(e);
+        }
     }
 
     @Override
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 90d60e01c6..9ec0ff852e 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -246,7 +246,7 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
 
             if (message != null) {
                 dequeuedMailsMetric.increment();
-                return Mono.just(createMailQueueItem(session, consumer, message));
+                return createMailQueueItem(session, consumer, message);
             } else {
                 session.commit();
                 closeConsumer(consumer);
@@ -405,7 +405,6 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
         } else {
             throw new MailQueueException("Not supported JMS Message received " + message);
         }
-
     }
 
     /**
@@ -509,10 +508,10 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
      * @throws JMSException
      * @throws MessagingException
      */
-    protected MailQueueItem createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
+    protected Mono<MailQueueItem> createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
         Mail mail = createMail(message);
         JMSMailQueueItem jmsMailQueueItem = new JMSMailQueueItem(mail, session, consumer);
-        return mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName);
+        return Mono.just(mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName));
     }
 
     protected String getMessageSelector() {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org