You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/07/03 10:48:27 UTC

[james-project] 07/07: JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit aa6ac0bf7a5a2403b9ef28a8db50639129ae0007
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jul 2 16:55:55 2020 +0700

    JAMES-3265 RabbitMQ MailQueue should deadletter dropped messages
    
    This prevents loosing dropped messages
---
 .../org/apache/james/queue/rabbitmq/Enqueuer.java  |  2 ++
 .../apache/james/queue/rabbitmq/MailQueueName.java | 10 ++++++++
 .../queue/rabbitmq/RabbitMQMailQueueFactory.java   | 13 ++++++++++
 .../queue/rabbitmq/RabbitMQMailQueueTest.java      | 30 +++++++++++++++++++++-
 4 files changed, 54 insertions(+), 1 deletion(-)

diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
index 481e380..c134798 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/Enqueuer.java
@@ -38,6 +38,7 @@ import org.apache.mailet.Mail;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.github.fge.lambdas.Throwing;
+import com.google.common.collect.ImmutableMap;
 import com.rabbitmq.client.AMQP;
 
 import reactor.core.publisher.Mono;
@@ -88,6 +89,7 @@ class Enqueuer {
             .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
             .priority(PERSISTENT_TEXT_PLAIN.getPriority())
             .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .headers(ImmutableMap.of("x-dead-letter-routing-key", EMPTY_ROUTING_KEY))
             .build();
 
         OutboundMessage data = new OutboundMessage(
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
index dfc352d..f70151e 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/MailQueueName.java
@@ -112,6 +112,8 @@ public final class MailQueueName {
 
     private static final String PREFIX = "JamesMailQueue";
     private static final String EXCHANGE_PREFIX = PREFIX + "-exchange-";
+    private static final String DEAD_LETTER_EXCHANGE_PREFIX = PREFIX + "-dead-letter-exchange-";
+    private static final String DEAD_LETTER_QUEUE_PREFIX = PREFIX + "-dead-letter-queue-";
     @VisibleForTesting static final String WORKQUEUE_PREFIX = PREFIX + "-workqueue-";
 
     public static MailQueueName fromString(String name) {
@@ -134,6 +136,14 @@ public final class MailQueueName {
         return name;
     }
 
+    String toDeadLetterExchangeName() {
+        return DEAD_LETTER_EXCHANGE_PREFIX + name;
+    }
+
+    String toDeadLetterQueueName() {
+        return DEAD_LETTER_QUEUE_PREFIX + name;
+    }
+
     ExchangeName toRabbitExchangeName() {
         return new ExchangeName(name);
     }
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
index e9b3c32..4dd0181 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueFactory.java
@@ -46,6 +46,7 @@ import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.queue.rabbitmq.view.api.MailQueueView;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
@@ -156,14 +157,26 @@ public class RabbitMQMailQueueFactory implements MailQueueFactory<RabbitMQMailQu
             sender.declareExchange(ExchangeSpecification.exchange(exchangeName)
                 .durable(true)
                 .type("direct")),
+            sender.declareExchange(ExchangeSpecification.exchange(mailQueueName.toDeadLetterExchangeName())
+                .durable(true)
+                .type("direct")),
             sender.declareQueue(QueueSpecification.queue(mailQueueName.toWorkQueueName().asString())
                 .durable(DURABLE)
                 .exclusive(!EXCLUSIVE)
                 .autoDelete(!AUTO_DELETE)
+                .arguments(ImmutableMap.of("x-dead-letter-exchange", mailQueueName.toDeadLetterExchangeName()))),
+            sender.declareQueue(QueueSpecification.queue(mailQueueName.toDeadLetterQueueName())
+                .durable(DURABLE)
+                .exclusive(!EXCLUSIVE)
+                .autoDelete(!AUTO_DELETE)
                 .arguments(NO_ARGUMENTS)),
             sender.bind(BindingSpecification.binding()
                 .exchange(mailQueueName.toRabbitExchangeName().asString())
                 .queue(mailQueueName.toWorkQueueName().asString())
+                .routingKey(EMPTY_ROUTING_KEY)),
+            sender.bind(BindingSpecification.binding()
+                .exchange(mailQueueName.toDeadLetterExchangeName())
+                .queue(mailQueueName.toDeadLetterQueueName())
                 .routingKey(EMPTY_ROUTING_KEY)))
             .then()
             .block();
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
index 0289266..bf2d209 100644
--- a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueTest.java
@@ -35,6 +35,7 @@ import java.time.Duration;
 import java.time.Instant;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -327,7 +328,7 @@ class RabbitMQMailQueueTest {
                 .times(1)
                 .whenQueryStartsWith("SELECT * FROM blobs WHERE id=:id;"));
 
-            List<MailQueue.MailQueueItem> items =  Flux.from(getMailQueue().deQueue())
+            List<MailQueue.MailQueueItem> items = Flux.from(getMailQueue().deQueue())
                 .take(3)
                 .collectList()
                 .block(Duration.ofSeconds(10));
@@ -457,6 +458,33 @@ class RabbitMQMailQueueTest {
                 .untilAsserted(() -> assertThat(dequeuedMailNames)
                     .containsExactly(name1, name2, name3));
         }
+
+        @Test
+        void rejectedMessagesShouldBeDeadLettered() {
+            String emptyRoutingKey = "";
+            rabbitMQExtension.getSender()
+                .send(Mono.just(new OutboundMessage("JamesMailQueue-exchange-spool",
+                    emptyRoutingKey,
+                    "BAD_PAYLOAD!".getBytes(StandardCharsets.UTF_8))))
+                .block();
+
+            AtomicInteger deadLetteredCount = new AtomicInteger();
+            rabbitMQExtension.getRabbitChannelPool()
+                .createReceiver()
+                .consumeAutoAck("JamesMailQueue-dead-letter-queue-spool")
+                .doOnNext(next -> deadLetteredCount.incrementAndGet())
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+            Flux.from(getMailQueue().deQueue())
+                .doOnNext(Throwing.consumer(item -> item.done(true)))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+
+
+            Awaitility.await().atMost(org.awaitility.Duration.TEN_SECONDS)
+                .untilAsserted(() -> assertThat(deadLetteredCount.get()).isEqualTo(1));
+        }
     }
 
     @Nested


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org