You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/02/26 13:40:19 UTC
[james-project] branch master updated: JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1465)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 314b9dc960 JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1465)
314b9dc960 is described below
commit 314b9dc960dc9c452f5d2d8cd0065b0cc200673d
Author: Benoit TELLIER <bt...@linagora.com>
AuthorDate: Sun Feb 26 20:40:14 2023 +0700
JAMES-3891 ActiveMQCacheableMailQueue: discard emails not backed by a blob (#1465)
---
.../queue/activemq/ActiveMQCacheableMailQueue.java | 27 ++++++++++++++++++----
.../james/queue/jms/JMSCacheableMailQueue.java | 7 +++---
2 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
index 01596cb926..6660a2a7db 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQCacheableMailQueue.java
@@ -18,6 +18,7 @@
****************************************************************/
package org.apache.james.queue.activemq;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
@@ -56,6 +57,8 @@ import org.apache.mailet.Mail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
/**
* <p>
* {@link MailQueue} implementation which use an ActiveMQ Queue.
@@ -226,10 +229,26 @@ public class ActiveMQCacheableMailQueue extends JMSCacheableMailQueue implements
}
@Override
- protected MailQueueItem createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
- Mail mail = createMail(message);
- ActiveMQMailQueueItem activeMQMailQueueItem = new ActiveMQMailQueueItem(mail, session, consumer, message);
- return mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, queueName);
+ protected Mono<MailQueueItem> createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
+ try {
+ Mail mail = createMail(message);
+ ActiveMQMailQueueItem activeMQMailQueueItem = new ActiveMQMailQueueItem(mail, session, consumer, message);
+ return Mono.just(mailQueueItemDecoratorFactory.decorate(activeMQMailQueueItem, queueName));
+ } catch (MessagingException e) {
+ if (e.getCause() instanceof FileNotFoundException) {
+ LOGGER.warn("Blob message cannot be found, discarding email", e);
+ try {
+ session.commit();
+ } catch (JMSException ex) {
+ throw new MailQueueException("Unable to commit dequeue operation for mail", ex);
+ } finally {
+ JMSCacheableMailQueue.closeConsumer(consumer);
+ JMSCacheableMailQueue.closeSession(session);
+ }
+ return Mono.empty();
+ }
+ return Mono.error(e);
+ }
}
@Override
diff --git a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
index 8f5356810e..b69177e5c4 100644
--- a/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
+++ b/server/queue/queue-jms/src/main/java/org/apache/james/queue/jms/JMSCacheableMailQueue.java
@@ -246,7 +246,7 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
if (message != null) {
dequeuedMailsMetric.increment();
- return Mono.just(createMailQueueItem(session, consumer, message));
+ return createMailQueueItem(session, consumer, message);
} else {
session.commit();
closeConsumer(consumer);
@@ -405,7 +405,6 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
} else {
throw new MailQueueException("Not supported JMS Message received " + message);
}
-
}
/**
@@ -507,10 +506,10 @@ public class JMSCacheableMailQueue implements ManageableMailQueue, JMSSupport, M
* @throws JMSException
* @throws MessagingException
*/
- protected MailQueueItem createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
+ protected Mono<MailQueueItem> createMailQueueItem(Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
Mail mail = createMail(message);
JMSMailQueueItem jmsMailQueueItem = new JMSMailQueueItem(mail, session, consumer);
- return mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName);
+ return Mono.just(mailQueueItemDecoratorFactory.decorate(jmsMailQueueItem, queueName));
}
protected String getMessageSelector() {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org