You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/25 15:01:31 UTC

[james-project] 03/04: JAMES-3817 Implement health check for RabbitMQ dead letter queues of event bus

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f2659d9d671bdc49e879b0483e19d4b2078e160b
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Tue Sep 20 17:08:31 2022 +0700

    JAMES-3817 Implement health check for RabbitMQ dead letter queues of event bus
    
    Degraded if dead letter queues length are not equal to 0.
---
 .../org/apache/james/events/NamingStrategy.java    |   2 +
 ...RabbitMQEventBusDeadLetterQueueHealthCheck.java |  65 +++++++++
 ...itMQEventBusDeadLetterQueueHealthCheckTest.java | 155 +++++++++++++++++++++
 .../james/modules/event/JMAPEventBusModule.java    |   4 +-
 .../modules/event/RabbitMQEventBusModule.java      |   5 +
 5 files changed, 229 insertions(+), 2 deletions(-)

diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
index 80dc2d2a4f..6e34d15d2d 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
@@ -22,6 +22,8 @@ package org.apache.james.events;
 import reactor.rabbitmq.QueueSpecification;
 
 public class NamingStrategy {
+    public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
+
     private final String prefix;
 
     public NamingStrategy(String prefix) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..6eeb2af045
--- /dev/null
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+
+public class RabbitMQEventBusDeadLetterQueueHealthCheck implements HealthCheck {
+    public static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQEventBusDeadLetterQueueHealthCheck");
+    private static final String DEFAULT_VHOST = "/";
+
+    private final RabbitMQConfiguration configuration;
+    private final NamingStrategy mailboxEventNamingStrategy;
+    private final RabbitMQManagementAPI api;
+
+    @Inject
+    public RabbitMQEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration configuration, NamingStrategy mailboxEventNamingStrategy) {
+        this.configuration = configuration;
+        this.mailboxEventNamingStrategy = mailboxEventNamingStrategy;
+        this.api = RabbitMQManagementAPI.from(configuration);
+    }
+
+    @Override
+    public ComponentName componentName() {
+        return COMPONENT_NAME;
+    }
+
+    @Override
+    public Mono<Result> check() {
+        return Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), mailboxEventNamingStrategy.deadLetterQueue().getName()).getQueueLength())
+            .mergeWith(Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), NamingStrategy.JMAP_NAMING_STRATEGY.deadLetterQueue().getName()).getQueueLength()))
+            .reduce(Long::sum)
+            .map(queueSize -> {
+                if (queueSize != 0) {
+                    return Result.degraded(COMPONENT_NAME, "RabbitMQ dead letter queue of the event bus contain events. This might indicate transient failure on event processing.");
+                }
+                return Result.healthy(COMPONENT_NAME);
+            })
+            .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking RabbitMQEventBusDeadLetterHealthCheck", e)));
+    }
+}
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..c5f2978fe1
--- /dev/null
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,155 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQEventBusDeadLetterQueueHealthCheckTest {
+    @RegisterExtension
+    RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+        .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+    public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+    public static final NamingStrategy MAILBOX_EVENTS_NAMING_STRATEGY = new NamingStrategy("mailboxEvents");
+    public static final String ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS = "mailboxEventsRoutingKey";
+    public static final String ROUTING_KEY_JMAP_EVENTS_EVENT_BUS = "mailboxEventsRoutingKey";
+
+    private Connection connection;
+    private Channel channel;
+    private RabbitMQEventBusDeadLetterQueueHealthCheck testee;
+
+    @BeforeEach
+    void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException {
+        ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+        connectionFactory.setNetworkRecoveryInterval(1000);
+        connection = connectionFactory.newConnection();
+        channel = connection.createChannel();
+        testee = new RabbitMQEventBusDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration(), MAILBOX_EVENTS_NAMING_STRATEGY);
+    }
+
+    @AfterEach
+    void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+        closeQuietly(connection, channel);
+        rabbitMQ.reset();
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception {
+        rabbitMQExtension.getRabbitMQ().stopApp();
+
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnHealthyWhenDeadLetterQueuesAreEmpty() throws Exception {
+        createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+        createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+        assertThat(testee.check().block().isHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+        assertThat(testee.check().block().isUnHealthy()).isTrue();
+    }
+
+    @Test
+    void healthCheckShouldReturnDegradedWhenMailboxEventBusDeadLetterQueueIsNotEmpty() throws Exception {
+        createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+        createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+        publishAMessage(channel, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+
+        awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+    }
+
+    @Test
+    void healthCheckShouldReturnDegradedWhenJmapEventBusDeadLetterQueueIsNotEmpty() throws Exception {
+        createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+        createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+        publishAMessage(channel, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+        awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+    }
+
+    @Test
+    void healthCheckShouldReturnDegradedWhenDeadLetterQueuesAreNotEmpty() throws Exception {
+        createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+        createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+        publishAMessage(channel, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+        publishAMessage(channel, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+        awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+    }
+
+    private void createDeadLetterQueue(Channel channel, NamingStrategy namingStrategy, String routingKey) throws IOException {
+        channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+        channel.queueDeclare(namingStrategy.deadLetterQueue().getName(), DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+        channel.queueBind(namingStrategy.deadLetterQueue().getName(), EXCHANGE_NAME, routingKey);
+    }
+
+    private void publishAMessage(Channel channel, String routingKey) throws IOException {
+        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+            .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+            .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+            .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+            .build();
+
+        channel.basicPublish(EXCHANGE_NAME, routingKey, basicProperties, "Hello, world!".getBytes(StandardCharsets.UTF_8));
+    }
+
+    private void closeQuietly(AutoCloseable... closeables) {
+        Arrays.stream(closeables).forEach(this::closeQuietly);
+    }
+
+    private void closeQuietly(AutoCloseable closeable) {
+        try {
+            closeable.close();
+        } catch (Exception e) {
+            //ignore error
+        }
+    }
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index dbff6cd7a3..25fa79f7e9 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.modules.event;
 
+import static org.apache.james.events.NamingStrategy.JMAP_NAMING_STRATEGY;
+
 import javax.inject.Named;
 
 import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -30,7 +32,6 @@ import org.apache.james.events.EventBusId;
 import org.apache.james.events.EventBusReconnectionHandler;
 import org.apache.james.events.EventDeadLetters;
 import org.apache.james.events.KeyReconnectionHandler;
-import org.apache.james.events.NamingStrategy;
 import org.apache.james.events.RabbitMQEventBus;
 import org.apache.james.events.RetryBackoffConfiguration;
 import org.apache.james.events.RoutingKeyConverter;
@@ -52,7 +53,6 @@ import com.google.inject.name.Names;
 import reactor.rabbitmq.Sender;
 
 public class JMAPEventBusModule extends AbstractModule {
-    public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
 
     @Override
     protected void configure() {
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 16f3196c88..2eb8c655a0 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.modules.event;
 
 import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.event.json.MailboxEventSerializer;
 import org.apache.james.events.EventBus;
 import org.apache.james.events.EventBusId;
@@ -28,6 +29,7 @@ import org.apache.james.events.EventSerializer;
 import org.apache.james.events.KeyReconnectionHandler;
 import org.apache.james.events.NamingStrategy;
 import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RabbitMQEventBusDeadLetterQueueHealthCheck;
 import org.apache.james.events.RegistrationKey;
 import org.apache.james.events.RetryBackoffConfiguration;
 import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
@@ -63,6 +65,9 @@ public class RabbitMQEventBusModule extends AbstractModule {
         Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
         reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class);
         reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class);
+
+        Multibinder.newSetBinder(binder(), HealthCheck.class)
+            .addBinding().to(RabbitMQEventBusDeadLetterQueueHealthCheck.class);
     }
 
     @ProvidesIntoSet


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org