You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/25 15:01:30 UTC

[james-project] 02/04: JAMES-3817 Implement health check for RabbitMQ dead letter queue of mail queue

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 709609759f56425e5d42840ff11b9f55ca1a40e7
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Tue Sep 20 16:13:41 2022 +0700

    JAMES-3817 Implement health check for RabbitMQ dead letter queue of mail queue
    
    Degraded if dead letter queue length is not equal to 0.
---
 .../modules/queue/rabbitmq/RabbitMQModule.java     |   5 +-
 ...abbitMQMailQueueDeadLetterQueueHealthCheck.java |  66 ++++++++++
 ...tMQMailQueueDeadLetterQueueHealthCheckTest.java | 133 +++++++++++++++++++++
 3 files changed, 203 insertions(+), 1 deletion(-)

diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
index 712354a21e..7d041c2db8 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
@@ -37,6 +37,7 @@ import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueDeadLetterQueueHealthCheck;
 import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
 import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
 import org.apache.james.utils.InitializationOperation;
@@ -65,7 +66,9 @@ public class RabbitMQModule extends AbstractModule {
     protected void configure() {
         bind(SimpleConnectionPool.class).in(Scopes.SINGLETON);
 
-        Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+        Multibinder<HealthCheck> healthCheckMultiBinder = Multibinder.newSetBinder(binder(), HealthCheck.class);
+        healthCheckMultiBinder.addBinding().to(RabbitMQHealthCheck.class);
+        healthCheckMultiBinder.addBinding().to(RabbitMQMailQueueDeadLetterQueueHealthCheck.class);
 
         Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
         reconnectionHandlerMultibinder.addBinding().to(SpoolerReconnectionHandler.class);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..37d0b3a068
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.api.MailQueueFactory.SPOOL;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class RabbitMQMailQueueDeadLetterQueueHealthCheck implements HealthCheck {
+    public static final MailQueueName JAMES_MAIL_QUEUE_NAME = MailQueueName.fromString(SPOOL.asString());
+    public static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQMailQueueDeadLetterQueueHealthCheck");
+    private static final String DEFAULT_VHOST = "/";
+
+    private final RabbitMQConfiguration configuration;
+    private final RabbitMQManagementAPI api;
+
+    @Inject
+    public RabbitMQMailQueueDeadLetterQueueHealthCheck(RabbitMQConfiguration configuration) {
+        this.configuration = configuration;
+        this.api = RabbitMQManagementAPI.from(configuration);
+    }
+
+    @Override
+    public ComponentName componentName() {
+        return COMPONENT_NAME;
+    }
+
+    @Override
+    public Mono<Result> check() {
+        return Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName()))
+            .map(queueDetails -> {
+                if (queueDetails.getQueueLength() != 0) {
+                    return Result.degraded(COMPONENT_NAME, "RabbitMQ dead letter queue of the mail queue contain messages. This might indicate transient failure on mail processing.");
+                }
+                return Result.healthy(COMPONENT_NAME);
+            })
+            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking RabbitMQMailQueueDeadLetterQueueHealthCheck", e)))
+            .subscribeOn(Schedulers.boundedElastic());
+    }
+}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..f32bd40a4e
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.apache.james.queue.rabbitmq.RabbitMQMailQueueDeadLetterQueueHealthCheck.JAMES_MAIL_QUEUE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQMailQueueDeadLetterQueueHealthCheckTest {
+    @RegisterExtension
+    RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+        .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+    public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+
+    private Connection connection;
+    private Channel channel;
+    private RabbitMQMailQueueDeadLetterQueueHealthCheck testee;
+
+    @BeforeEach
+    void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException {
+        ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+        connectionFactory.setNetworkRecoveryInterval(1000);
+        connection = connectionFactory.newConnection();
+        channel = connection.createChannel();
+        testee = new RabbitMQMailQueueDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration());
+    }
+
+    @AfterEach
+    void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+        closeQuietly(connection, channel);
+        rabbitMQ.reset();
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception {
+        rabbitMQExtension.getRabbitMQ().stopApp();
+
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnHealthyWhenDeadLetterQueueIsEmpty() throws Exception {
+        createDeadLetterQueue(channel);
+
+        assertThat(testee.check().block().isHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnDegradedWhenDeadLetterQueueIsNotEmpty() throws Exception {
+        createDeadLetterQueue(channel);
+        publishAMessage(channel);
+        publishAMessage(channel);
+
+        awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    private void createDeadLetterQueue(Channel channel) throws IOException {
+        channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+        channel.queueDeclare(JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName(), DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+        channel.queueBind(JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName(), EXCHANGE_NAME, ROUTING_KEY);
+    }
+
+    private void publishAMessage(Channel channel) throws IOException {
+        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .build();
+
+        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, "Hello, world!".getBytes(StandardCharsets.UTF_8));
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org