You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:21 UTC

[james-project] 01/07: JAMES-3290 Mails failing to load should not be lost by dequeue

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ea6e8843783d7f43b4badc84860b6c3deadbc62e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 15:45:05 2020 +0700

    JAMES-3290 Mails failing to load should not be lost by dequeue
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 12 ++++++--
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 33 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b13b613..27fe748 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -33,6 +33,8 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
@@ -46,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions;
 import reactor.rabbitmq.Receiver;
 
 class Dequeuer implements Closeable {
+    private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class);
     private static final boolean REQUEUE = true;
 
     private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -136,9 +139,14 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
         return toMailReference(response)
-            .flatMap(mailLoader::load);
+            .flatMap(reference -> mailLoader.load(reference)
+                .onErrorResume(e -> {
+                    LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
+                    response.nack(REQUEUE);
+                    return Mono.empty();
+                }));
     }
 
     private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index d34a953..6bf316b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.queue.rabbitmq;
 
 import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -296,6 +297,38 @@ class RabbitMQMailQueueTest {
                 }))
                 .blockLast();
         }
+
+        @Test
+        void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            cassandra.getConf().registerScenario(fail()
+                .times(1)
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            List<MailQueue.MailQueueItem> items =  Flux.from(getMailQueue().deQueue())
+                .take(3)
+                .collectList()
+                .block(Duration.ofSeconds(10));
+
+            assertThat(items)
+                .extracting(item -> item.getMail().getName())
+                .containsOnly(name1, name2, name3);
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org