You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/15 02:00:26 UTC

[james-project] 08/10: JAMES-3720 RabbitMQMailQueue::requeue need to dispose emails

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8a70627ac20f60802764b3ed323201b1aad075a2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 4 21:16:06 2022 +0700

    JAMES-3720 RabbitMQMailQueue::requeue need to dispose emails
    
    Failure to do so will lead to a temporary file leak...
    
    (cherry picked from commit 02aee6e965eef8ab192b9b3cb4c6275989b24be6)
---
 .../main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java  | 9 ++++++++-
 .../java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java  | 2 ++
 .../view/cassandra/model/EnqueuedItemWithSlicingContext.java     | 8 +++++++-
 3 files changed, 17 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
index 4bdc4a9..daa424a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
@@ -23,11 +23,13 @@ import java.time.Instant;
 import java.util.Objects;
 
 import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.lifecycle.api.Disposable;
+import org.apache.james.lifecycle.api.LifecycleUtil;
 import org.apache.mailet.Mail;
 
 import com.google.common.base.Preconditions;
 
-public class EnqueuedItem {
+public class EnqueuedItem implements Disposable {
 
     interface Builder {
 
@@ -122,6 +124,11 @@ public class EnqueuedItem {
     }
 
     @Override
+    public void dispose() {
+        LifecycleUtil.dispose(mail);
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o instanceof EnqueuedItem) {
             EnqueuedItem that = (EnqueuedItem) o;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 85f651d..602e679 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -40,6 +40,7 @@ import com.google.common.base.MoreObjects;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public class RabbitMQMailQueue implements ManageableMailQueue {
 
@@ -138,6 +139,7 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
     public Flux<String> republishNotProcessedMails(Instant olderThan) {
         Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, Mono<String>> requeue = item ->
             enqueuer.reQueue(item)
+                .then(Mono.fromRunnable(item::dispose).subscribeOn(Schedulers.elastic()))
                 .thenReturn(item.getMail().getName());
 
         return mailQueueView.browseOlderThanReactive(olderThan)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
index 45ff74c..b154b87 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
@@ -22,11 +22,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model;
 import java.time.Instant;
 import java.util.Objects;
 
+import org.apache.james.lifecycle.api.Disposable;
 import org.apache.james.queue.rabbitmq.EnqueuedItem;
 
 import com.google.common.base.Preconditions;
 
-public class EnqueuedItemWithSlicingContext {
+public class EnqueuedItemWithSlicingContext implements Disposable {
 
     public static class SlicingContext {
 
@@ -117,6 +118,11 @@ public class EnqueuedItemWithSlicingContext {
     }
 
     @Override
+    public void dispose() {
+        enqueuedItem.dispose();
+    }
+
+    @Override
     public final boolean equals(Object o) {
         if (o instanceof EnqueuedItemWithSlicingContext) {
             EnqueuedItemWithSlicingContext that = (EnqueuedItemWithSlicingContext) o;

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org