You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:26 UTC
[james-project] 06/07: JAMES-3290 Many invalid JSON should not
abort dequeue
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9f0a943fd2d07ae9d0b26a35ce1293cd0f325efc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 3 09:31:13 2020 +0700
JAMES-3290 Many invalid JSON should not abort dequeue
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 14 ++++----
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 39 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 7 deletions(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index bacd1ff..fe209f9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
import com.github.fge.lambdas.Throwing;
import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.rabbitmq.client.Delivery;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -139,26 +138,27 @@ class Dequeuer implements Closeable {
};
}
- private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
- return toMailReference(response)
+ private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery delivery) {
+ return toMailReference(delivery)
.flatMap(reference -> mailLoader.load(reference)
.onErrorResume(ObjectNotFoundException.class, e -> {
LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
- response.nack(!REQUEUE);
+ delivery.nack(!REQUEUE);
return Mono.empty();
})
.onErrorResume(e -> {
LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
- response.nack(REQUEUE);
+ delivery.nack(REQUEUE);
return Mono.empty();
}));
}
- private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
- return Mono.fromCallable(getResponse::getBody)
+ private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery delivery) {
+ return Mono.fromCallable(delivery::getBody)
.map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
.onErrorResume(e -> {
LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e);
+ delivery.nack(!REQUEUE);
return Mono.empty();
});
}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index abae166..0289266 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -418,6 +418,45 @@ class RabbitMQMailQueueTest {
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
+
+ @Test
+ void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+
+ String emptyRoutingKey = "";
+
+ IntStream.range(0, 100)
+ .forEach(i -> rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+ emptyRoutingKey,
+ ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8))))
+ .block());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
+
+ Flux.from(getMailQueue().deQueue())
+ .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
+ .doOnNext(Throwing.consumer(item -> item.done(true)))
+ .subscribe();
+
+ Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+ .untilAsserted(() -> assertThat(dequeuedMailNames)
+ .containsExactly(name1, name2, name3));
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org