You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:23 UTC
[james-project] 03/07: JAMES-3290 Prevent an infinite loop when
RabbitMQMailQueue message references a blob that no longer exist.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b3ecc4645581be9625ea7f4912886f9075459074
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Jul 1 17:08:22 2020 +0700
JAMES-3290 Prevent an infinite loop when RabbitMQMailQueue message references a blob that no longer exist.
---
.../org/apache/james/queue/rabbitmq/Dequeuer.java | 6 +++
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 49 ++++++++++++++++++++++
2 files changed, 55 insertions(+)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
index 27fe748..09a3df6 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Dequeuer.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.function.Consumer;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.blob.api.ObjectNotFoundException;
import org.apache.james.metrics.api.Metric;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.queue.api.MailQueue;
@@ -142,6 +143,11 @@ class Dequeuer implements Closeable {
private Mono<MailWithEnqueueId> loadMail(AcknowledgableDelivery response) {
return toMailReference(response)
.flatMap(reference -> mailLoader.load(reference)
+ .onErrorResume(ObjectNotFoundException.class, e -> {
+ LOGGER.error("Fail to load mail {} with enqueueId {} as underlying blobs do not exist. Discarding this message to prevent an infinite loop.", reference.getName(), reference.getEnqueueId(), e);
+ response.nack(!REQUEUE);
+ return Mono.empty();
+ })
.onErrorResume(e -> {
LOGGER.error("Fail to load mail {} with enqueueId {}", reference.getName(), reference.getEnqueueId(), e);
response.nack(REQUEUE);
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 6bf316b..ee32d8a 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -20,7 +20,9 @@
package org.apache.james.queue.rabbitmq;
import static java.time.temporal.ChronoUnit.HOURS;
+import static org.apache.james.backends.cassandra.Scenario.Builder.executeNormally;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
+import static org.apache.james.backends.cassandra.Scenario.Builder.returnEmpty;
import static org.apache.james.queue.api.Mails.defaultMail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
@@ -31,6 +33,7 @@ import static org.mockito.Mockito.verify;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -72,6 +75,7 @@ import com.github.fge.lambdas.Throwing;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
class RabbitMQMailQueueTest {
private static final HashBlobId.Factory BLOB_ID_FACTORY = new HashBlobId.Factory();
@@ -329,6 +333,51 @@ class RabbitMQMailQueueTest {
.extracting(item -> item.getMail().getName())
.containsOnly(name1, name2, name3);
}
+
+ @Test
+ void dequeueShouldNotRetryWhenBlobIsMissing(CassandraCluster cassandra) throws Exception {
+ String name1 = "myMail1";
+ String name2 = "myMail2";
+ String name3 = "myMail3";
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name1)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name2)
+ .build());
+
+ getMailQueue().enQueue(defaultMail()
+ .name(name3)
+ .build());
+
+ cassandra.getConf().registerScenario(returnEmpty()
+ .forever()
+ .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+ ConcurrentLinkedDeque<String> dequeuedNames = new ConcurrentLinkedDeque<>();
+ Flux.from(getMailQueue().deQueue())
+ .take(3)
+ .doOnNext(item -> dequeuedNames.add(item.getMail().getName()))
+ .doOnNext(Throwing.consumer(item -> item.done(true)))
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
+
+ // One second should be enough to attempt dequeues while we fail to load blobs
+ Thread.sleep(1000);
+
+ // Restore normal behaviour
+ cassandra.getConf().registerScenario(executeNormally()
+ .forever()
+ .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
+
+ // Let one second to check if the queue is empty
+ Thread.sleep(1000);
+
+ // We expect content missing blob references to be purged from the queue
+ assertThat(dequeuedNames).isEmpty();
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org