You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/15 02:00:26 UTC
[james-project] 08/10: JAMES-3720 RabbitMQMailQueue::requeue need to dispose emails
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 8a70627ac20f60802764b3ed323201b1aad075a2
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Mar 4 21:16:06 2022 +0700
JAMES-3720 RabbitMQMailQueue::requeue need to dispose emails
Failure to do so will lead to a temporary file leak...
(cherry picked from commit 02aee6e965eef8ab192b9b3cb4c6275989b24be6)
---
.../main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java | 9 ++++++++-
.../java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java | 2 ++
.../view/cassandra/model/EnqueuedItemWithSlicingContext.java | 8 +++++++-
3 files changed, 17 insertions(+), 2 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
index 4bdc4a9..daa424a 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/EnqueuedItem.java
@@ -23,11 +23,13 @@ import java.time.Instant;
import java.util.Objects;
import org.apache.james.blob.mail.MimeMessagePartsId;
+import org.apache.james.lifecycle.api.Disposable;
+import org.apache.james.lifecycle.api.LifecycleUtil;
import org.apache.mailet.Mail;
import com.google.common.base.Preconditions;
-public class EnqueuedItem {
+public class EnqueuedItem implements Disposable {
interface Builder {
@@ -122,6 +124,11 @@ public class EnqueuedItem {
}
@Override
+ public void dispose() {
+ LifecycleUtil.dispose(mail);
+ }
+
+ @Override
public final boolean equals(Object o) {
if (o instanceof EnqueuedItem) {
EnqueuedItem that = (EnqueuedItem) o;
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
index 85f651d..602e679 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueue.java
@@ -40,6 +40,7 @@ import com.google.common.base.MoreObjects;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
public class RabbitMQMailQueue implements ManageableMailQueue {
@@ -138,6 +139,7 @@ public class RabbitMQMailQueue implements ManageableMailQueue {
public Flux<String> republishNotProcessedMails(Instant olderThan) {
Function<CassandraMailQueueBrowser.CassandraMailQueueItemView, Mono<String>> requeue = item ->
enqueuer.reQueue(item)
+ .then(Mono.fromRunnable(item::dispose).subscribeOn(Schedulers.elastic()))
.thenReturn(item.getMail().getName());
return mailQueueView.browseOlderThanReactive(olderThan)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
index 45ff74c..b154b87 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/model/EnqueuedItemWithSlicingContext.java
@@ -22,11 +22,12 @@ package org.apache.james.queue.rabbitmq.view.cassandra.model;
import java.time.Instant;
import java.util.Objects;
+import org.apache.james.lifecycle.api.Disposable;
import org.apache.james.queue.rabbitmq.EnqueuedItem;
import com.google.common.base.Preconditions;
-public class EnqueuedItemWithSlicingContext {
+public class EnqueuedItemWithSlicingContext implements Disposable {
public static class SlicingContext {
@@ -117,6 +118,11 @@ public class EnqueuedItemWithSlicingContext {
}
@Override
+ public void dispose() {
+ enqueuedItem.dispose();
+ }
+
+ @Override
public final boolean equals(Object o) {
if (o instanceof EnqueuedItemWithSlicingContext) {
EnqueuedItemWithSlicingContext that = (EnqueuedItemWithSlicingContext) o;
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org