You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:27 UTC
[james-project] 07/07: JAMES-3265 RabbitMQ MailQueue should
deadletter dropped messages
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit aa6ac0bf7a5a2403b9ef28a8db50639129ae0007
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 16:55:55 2020 +0700
JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages
This prevents loosing dropped messages
---
.../org/apache/james/queue/rabbitmq/Enqueuer.java | 2 ++
.../apache/james/queue/rabbitmq/MailQueueName.java | 10 ++++++++
.../queue/rabbitmq/RabbitMQMailQueueFactory.java | 13 ++++++++++
.../queue/rabbitmq/RabbitMQMailQueueTest.java | 30 +++++++++++++++++++++-
4 files changed, 54 insertions(+), 1 deletion(-)
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 481e380..c134798 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -38,6 +38,7 @@ import org.apache.mailet.Mail;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
import com.rabbitmq.client.AMQP;
import reactor.core.publisher.Mono;
@@ -88,6 +89,7 @@ class Enqueuer {
.deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
.priority(PERSISTENT_TEXT_PLAIN.getPriority())
.contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .headers(ImmutableMap.of("x-dead-letter-routing-key", EMPTY_ROUTING_KEY))
.build();
OutboundMessage data = new OutboundMessage(
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
index dfc352d..f70151e 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
@@ -112,6 +112,8 @@ public final class MailQueueName {
private static final String PREFIX = "JamesMailQueue";
private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-";
+ private static final String DEAD_LETTER_EXCHANGE_PREFIX = PREFIX + "-dead-letter-exchange-";
+ private static final String DEAD_LETTER_QUEUE_PREFIX = PREFIX + "-dead-letter-queue-";
@VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + "-workqueue-";
public static MailQueueName fromString(String name) {
@@ -134,6 +136,14 @@ public final class MailQueueName {
return name;
}
+ String toDeadLetterExchangeName() {
+ return DEAD_LETTER_EXCHANGE_PREFIX + name;
+ }
+
+ String toDeadLetterQueueName() {
+ return DEAD_LETTER_QUEUE_PREFIX + name;
+ }
+
ExchangeName toRabbitExchangeName() {
return new ExchangeName(name);
}
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index e9b3c32..4dd0181 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -46,6 +46,7 @@ import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Flux;
@@ -156,14 +157,26 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
.durable(true)
.type("direct")),
+ sender.declareExchange(ExchangeSpecification.exchange(mailQueueName.toDeadLetterExchangeName())
+ .durable(true)
+ .type("direct")),
sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
.durable(DURABLE)
.exclusive(!EXCLUSIVE)
.autoDelete(!AUTO_DELETE)
+ .arguments(ImmutableMap.of("x-dead-letter-exchange", mailQueueName.toDeadLetterExchangeName()))),
+ sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName())
+ .durable(DURABLE)
+ .exclusive(!EXCLUSIVE)
+ .autoDelete(!AUTO_DELETE)
.arguments(NO_ARGUMENTS)),
sender.bind(BindingSpecification.binding()
.exchange(mailQueueName.toRabbitExchangeName().asString())
.queue(mailQueueName.toWorkQueueName().asString())
+ .routingKey(EMPTY_ROUTING_KEY)),
+ sender.bind(BindingSpecification.binding()
+ .exchange(mailQueueName.toDeadLetterExchangeName())
+ .queue(mailQueueName.toDeadLetterQueueName())
.routingKey(EMPTY_ROUTING_KEY)))
.then()
.block();
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 0289266..bf2d209 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -35,6 +35,7 @@ import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -327,7 +328,7 @@ class RabbitMQMailQueueTest {
.times(1)
.whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
- List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
+ List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
.take(3)
.collectList()
.block(Duration.ofSeconds(10));
@@ -457,6 +458,33 @@ class RabbitMQMailQueueTest {
.untilAsserted(() -> assertThat(dequeuedMailNames)
.containsExactly(name1, name2, name3));
}
+
+ @Test
+ void rejectedMessagesShouldBeDeadLettered() {
+ String emptyRoutingKey = "";
+ rabbitMQExtension.getSender()
+ .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+ emptyRoutingKey,
+ "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+ .block();
+
+ AtomicInteger deadLetteredCount = new AtomicInteger();
+ rabbitMQExtension.getRabbitChannelPool()
+ .createReceiver()
+ .consumeAutoAck("JamesMailQueue-dead-letter-queue-spool")
+ .doOnNext(next -> deadLetteredCount.incrementAndGet())
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
+
+ Flux.from(getMailQueue().deQueue())
+ .doOnNext(Throwing.consumer(item -> item.done(true)))
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
+
+
+ Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+ .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
+ }
}
@Nested
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org