You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:26 UTC

[james-project] 06/07: JAMES-3290 Many invalid JSON should not abort dequeue

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9f0a943fd2d07ae9d0b26a35ce1293cd0f325efc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Jul 3 09:31:13 2020 +0700

    JAMES-3290 Many invalid JSON should not abort dequeue
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  | 14 ++++----
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 39 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index bacd1ff..fe209f9 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -38,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import com.github.fge.lambdas.Throwing;
 import com.github.fge.lambdas.consumers.ThrowingConsumer;
-import com.rabbitmq.client.Delivery;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -139,26 +138,27 @@ class Dequeuer implements Closeable {
         };
     }
 
-    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
-        return toMailReference(response)
+    private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery delivery) {
+        return toMailReference(delivery)
             .flatMap(reference -> mailLoader.load(reference)
                 .onErrorResume(ObjectNotFoundException.class, e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
-                    response.nack(!REQUEUE);
+                    delivery.nack(!REQUEUE);
                     return Mono.empty();
                 })
                 .onErrorResume(e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
-                    response.nack(REQUEUE);
+                    delivery.nack(REQUEUE);
                     return Mono.empty();
                 }));
     }
 
-    private Mono<MailReferenceDTO> toMailReference(Delivery getResponse) {
-        return Mono.fromCallable(getResponse::getBody)
+    private Mono<MailReferenceDTO> toMailReference(AcknowledgableDelivery delivery) {
+        return Mono.fromCallable(delivery::getBody)
             .map(Throwing.function(mailReferenceSerializer::read).sneakyThrow())
             .onErrorResume(e -> {
                 LOGGER.error("Fail to deserialize MailReferenceDTO. Discarding this message to prevent an infinite loop.", e);
+                delivery.nack(!REQUEUE);
                 return Mono.empty();
             });
     }
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index abae166..0289266 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -418,6 +418,45 @@ class RabbitMQMailQueueTest {
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
+
+        @Test
+        void manyInvalidMessagesShouldNotAbortProcessing() throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            String emptyRoutingKey = "";
+
+            IntStream.range(0, 100)
+                .forEach(i -> rabbitMQExtension.getSender()
+                    .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+                        emptyRoutingKey,
+                        ("BAD_PAYLOAD " + i).getBytes(StandardCharsets.UTF_8))))
+                    .block());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            ConcurrentLinkedDeque<String> dequeuedMailNames = new ConcurrentLinkedDeque<>();
+
+            Flux.from(getMailQueue().deQueue())
+                .doOnNext(item -> dequeuedMailNames.add(item.getMail().getName()))
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribe();
+
+            Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+                .untilAsserted(() -> assertThat(dequeuedMailNames)
+                    .containsExactly(name1, name2, name3));
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org