You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:23 UTC

[james-project] 03/07: JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b3ecc4645581be9625ea7f4912886f9075459074
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 17:08:22 2020 +0700

    JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.
---
 .../org/apache/james/queue/rabbitmq/Dequeuer.java  |  6 +++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 49 ++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 27fe748..09a3df6 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.function.Consumer;
 
 import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.blob.api.ObjectNotFoundException;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.queue.api.MailQueue;
@@ -142,6 +143,11 @@ class Dequeuer implements Closeable {
     private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
         return toMailReference(response)
             .flatMap(reference -> mailLoader.load(reference)
+                .onErrorResume(ObjectNotFoundException.class, e -> {
+                    LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
+                    response.nack(!REQUEUE);
+                    return Mono.empty();
+                })
                 .onErrorResume(e -> {
                     LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
                     response.nack(REQUEUE);
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 6bf316b..ee32d8a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,7 +20,9 @@
 package org.apache.james.queue.rabbitmq;
 
 import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
 import static org.apache.james.queue.api.Mails.defaultMail;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -72,6 +75,7 @@ import com.github.fge.lambdas.Throwing;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 class RabbitMQMailQueueTest {
     private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -329,6 +333,51 @@ class RabbitMQMailQueueTest {
                 .extracting(item -> item.getMail().getName())
                 .containsOnly(name1, name2, name3);
         }
+
+        @Test
+        void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception {
+            String name1 = "myMail1";
+            String name2 = "myMail2";
+            String name3 = "myMail3";
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name1)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name2)
+                .build());
+
+            getMailQueue().enQueue(defaultMail()
+                .name(name3)
+                .build());
+
+            cassandra.getConf().registerScenario(returnEmpty()
+                .forever()
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            ConcurrentLinkedDeque<String> dequeuedNames = new ConcurrentLinkedDeque<>();
+            Flux.from(getMailQueue().deQueue())
+                .take(3)
+                .doOnNext(item -> dequeuedNames.add(item.getMail().getName()))
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            // One second should be enough to attempt dequeues while we fail to load blobs
+            Thread.sleep(1000);
+
+            // Restore normal behaviour
+            cassandra.getConf().registerScenario(executeNormally()
+                .forever()
+                .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+            // Let one second to check if the queue is empty
+            Thread.sleep(1000);
+
+            // We expect content missing blob references to be purged from the queue
+            assertThat(dequeuedNames).isEmpty();
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org