You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/25 15:01:30 UTC
[james-project] 02/04: JAMES-3817 Implement health check for RabbitMQ dead letter queue of mail queue
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 709609759f56425e5d42840ff11b9f55ca1a40e7
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Tue Sep 20 16:13:41 2022 +0700
JAMES-3817 Implement health check for RabbitMQ dead letter queue of mail queue
Degraded if dead letter queue length is not equal to 0.
---
.../modules/queue/rabbitmq/RabbitMQModule.java | 5 +-
...abbitMQMailQueueDeadLetterQueueHealthCheck.java | 66 ++++++++++
...tMQMailQueueDeadLetterQueueHealthCheckTest.java | 133 +++++++++++++++++++++
3 files changed, 203 insertions(+), 1 deletion(-)
diff --git a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
index 712354a21e..7d041c2db8 100644
--- a/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
+++ b/server/container/guice/queue/rabbitmq/src/main/java/org/apache/james/modules/queue/rabbitmq/RabbitMQModule.java
@@ -37,6 +37,7 @@ import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueue;
+import org.apache.james.queue.rabbitmq.RabbitMQMailQueueDeadLetterQueueHealthCheck;
import org.apache.james.queue.rabbitmq.RabbitMQMailQueueFactory;
import org.apache.james.queue.rabbitmq.view.RabbitMQMailQueueConfiguration;
import org.apache.james.utils.InitializationOperation;
@@ -65,7 +66,9 @@ public class RabbitMQModule extends AbstractModule {
protected void configure() {
bind(SimpleConnectionPool.class).in(Scopes.SINGLETON);
- Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(RabbitMQHealthCheck.class);
+ Multibinder<HealthCheck> healthCheckMultiBinder = Multibinder.newSetBinder(binder(), HealthCheck.class);
+ healthCheckMultiBinder.addBinding().to(RabbitMQHealthCheck.class);
+ healthCheckMultiBinder.addBinding().to(RabbitMQMailQueueDeadLetterQueueHealthCheck.class);
Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
reconnectionHandlerMultibinder.addBinding().to(SpoolerReconnectionHandler.class);
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..37d0b3a068
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheck.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static org.apache.james.queue.api.MailQueueFactory.SPOOL;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class RabbitMQMailQueueDeadLetterQueueHealthCheck implements HealthCheck {
+ public static final MailQueueName JAMES_MAIL_QUEUE_NAME = MailQueueName.fromString(SPOOL.asString());
+ public static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQMailQueueDeadLetterQueueHealthCheck");
+ private static final String DEFAULT_VHOST = "/";
+
+ private final RabbitMQConfiguration configuration;
+ private final RabbitMQManagementAPI api;
+
+ @Inject
+ public RabbitMQMailQueueDeadLetterQueueHealthCheck(RabbitMQConfiguration configuration) {
+ this.configuration = configuration;
+ this.api = RabbitMQManagementAPI.from(configuration);
+ }
+
+ @Override
+ public ComponentName componentName() {
+ return COMPONENT_NAME;
+ }
+
+ @Override
+ public Mono<Result> check() {
+ return Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName()))
+ .map(queueDetails -> {
+ if (queueDetails.getQueueLength() != 0) {
+ return Result.degraded(COMPONENT_NAME, "RabbitMQ dead letter queue of the mail queue contain messages. This might indicate transient failure on mail processing.");
+ }
+ return Result.healthy(COMPONENT_NAME);
+ })
+ .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking RabbitMQMailQueueDeadLetterQueueHealthCheck", e)))
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+}
diff --git a/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..f32bd40a4e
--- /dev/null
+++ b/server/queue/queue-rabbitmq/src/test/java/org/apache/james/queue/rabbitmq/RabbitMQMailQueueDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,133 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.rabbitmq;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.ROUTING_KEY;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.apache.james.queue.rabbitmq.RabbitMQMailQueueDeadLetterQueueHealthCheck.JAMES_MAIL_QUEUE_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQMailQueueDeadLetterQueueHealthCheckTest {
+ @RegisterExtension
+ RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+ .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+ public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+
+ private Connection connection;
+ private Channel channel;
+ private RabbitMQMailQueueDeadLetterQueueHealthCheck testee;
+
+ @BeforeEach
+ void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException {
+ ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+ connectionFactory.setNetworkRecoveryInterval(1000);
+ connection = connectionFactory.newConnection();
+ channel = connection.createChannel();
+ testee = new RabbitMQMailQueueDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration());
+ }
+
+ @AfterEach
+ void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+ closeQuietly(connection, channel);
+ rabbitMQ.reset();
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception {
+ rabbitMQExtension.getRabbitMQ().stopApp();
+
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnHealthyWhenDeadLetterQueueIsEmpty() throws Exception {
+ createDeadLetterQueue(channel);
+
+ assertThat(testee.check().block().isHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnDegradedWhenDeadLetterQueueIsNotEmpty() throws Exception {
+ createDeadLetterQueue(channel);
+ publishAMessage(channel);
+ publishAMessage(channel);
+
+ awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ private void createDeadLetterQueue(Channel channel) throws IOException {
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+ channel.queueDeclare(JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName(), DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+ channel.queueBind(JAMES_MAIL_QUEUE_NAME.toDeadLetterQueueName(), EXCHANGE_NAME, ROUTING_KEY);
+ }
+
+ private void publishAMessage(Channel channel) throws IOException {
+ AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .build();
+
+ channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, basicProperties, "Hello, world!".getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org