You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by ka...@apache.org on 2022/09/01 20:07:39 UTC

[james-project] 01/02: JAMES-3810 Dequeuer should nack when fails to see if email was deleted

This is an automated email from the ASF dual-hosted git repository.

kao pushed a commit to branch 3.7.x
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 903f2025529e66e099955c370a219f42fabe1c6f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Aug 22 23:24:05 2022 +0700

    JAMES-3810 Dequeuer should nack when fails to see if email was deleted
    
    Failure to do so leads to unackowledge messages and would eventually halt the delivery.
    
    (cherry picked from commit 93eeeb65dfeacfbdca81f01ec7286f919cc50d90)
    
    Backport: Uses elastic scheduler since there is no BLOCKING_CALL_WRAPPER yet
---
 .../main/java/org/apache/james/queue/rabbitmq/Dequeuer.java   | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 92c549d23a..573b7a2497 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -41,6 +41,7 @@ import com.github.fge.lambdas.consumers.ThrowingConsumer;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 import reactor.rabbitmq.AcknowledgableDelivery;
 import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
@@ -108,14 +109,20 @@ class Dequeuer {
 
     private Mono<RabbitMQMailQueueItem> filterIfDeleted(RabbitMQMailQueueItem item) {
         return mailQueueView.isPresent(item.getEnqueueId())
-            .handle((isPresent, sink) -> {
+            .<RabbitMQMailQueueItem>handle((isPresent, sink) -> {
                 if (isPresent) {
                     sink.next(item);
                 } else {
                     item.done(true);
                     sink.complete();
                 }
-            });
+            })
+            .onErrorResume(e -> Mono.fromRunnable(() -> {
+                LOGGER.error("Failure to see if {} was deleted", item.enqueueId.asUUID(), e);
+                item.done(false);
+            })
+                .subscribeOn(Schedulers.elastic())
+                .then(Mono.error(e)));
     }
 
     private Mono<RabbitMQMailQueueItem> loadItem(AcknowledgableDelivery response) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org