You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:21 UTC
[james-project] 01/07: JAMES-3290 Mails failing to load should not
be lost by dequeue
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ea6e8843783d7f43b4badc84860b6c3deadbc62e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 15:45:05 2020 +0700
JAMES-3290 Mails failing to load should not be lost by dequeue
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 12 ++++++--
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 33 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index b13b613..27fe748 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -33,6 +33,8 @@ import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.rabbitmq.view.api.DeleteCondition;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import org.apache.mailet.Mail;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
@@ -46,6 +48,7 @@ import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
class Dequeuer implements Closeable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(Dequeuer.class);
private static final boolean REQUEUE = true;
private static class RabbitMQMailQueueItem implements MailQueue.MailQueueItem {
@@ -136,9 +139,14 @@ class Dequeuer implements Closeable {
};
}
- private Mono<MailWithEnqueueId> loadMail(Delivery response) {
+ private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
return toMailReference(response)
- .flatMap(mailLoader::load);
+ .flatMap(reference -> mailLoader.load(reference)
+ .onErrorResume(e -> {
+ LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
+ response.nack(REQUEUE);
+ return Mono.empty();
+ }));
}
private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index d34a953..6bf316b 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,6 +20,7 @@
package org.apache.james.queue.rabbitmq;
import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -296,6 +297,38 @@ class RabbitMQMailQueueTest {
}))
.blockLast();
}
+
+ @Test
+ void dequeueShouldRetryLoadingErrors(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ cassandra.getConf().registerScenario(fail()
+ .times(1)
+ .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+ List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
+ .take(3)
+ .collectList()
+ .block(Duration.ofSeconds(10));
+
+ assertThat(items)
+ .extracting(item -> item.getMail().getName())
+ .containsOnly(name1, name2, name3);
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org