You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/09/25 15:01:31 UTC
[james-project] 03/04: JAMES-3817 Implement health check for RabbitMQ dead letter queues of event bus
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f2659d9d671bdc49e879b0483e19d4b2078e160b
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Tue Sep 20 17:08:31 2022 +0700
JAMES-3817 Implement health check for RabbitMQ dead letter queues of event bus
Degraded if dead letter queues length are not equal to 0.
---
.../org/apache/james/events/NamingStrategy.java | 2 +
...RabbitMQEventBusDeadLetterQueueHealthCheck.java | 65 +++++++++
...itMQEventBusDeadLetterQueueHealthCheckTest.java | 155 +++++++++++++++++++++
.../james/modules/event/JMAPEventBusModule.java | 4 +-
.../modules/event/RabbitMQEventBusModule.java | 5 +
5 files changed, 229 insertions(+), 2 deletions(-)
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
index 80dc2d2a4f..6e34d15d2d 100644
--- a/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/NamingStrategy.java
@@ -22,6 +22,8 @@ package org.apache.james.events;
import reactor.rabbitmq.QueueSpecification;
public class NamingStrategy {
+ public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
+
private final String prefix;
public NamingStrategy(String prefix) {
diff --git a/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java
new file mode 100644
index 0000000000..6eeb2af045
--- /dev/null
+++ b/event-bus/distributed/src/main/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheck.java
@@ -0,0 +1,65 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
+import org.apache.james.backends.rabbitmq.RabbitMQManagementAPI;
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+
+import reactor.core.publisher.Mono;
+
+public class RabbitMQEventBusDeadLetterQueueHealthCheck implements HealthCheck {
+ public static final ComponentName COMPONENT_NAME = new ComponentName("RabbitMQEventBusDeadLetterQueueHealthCheck");
+ private static final String DEFAULT_VHOST = "/";
+
+ private final RabbitMQConfiguration configuration;
+ private final NamingStrategy mailboxEventNamingStrategy;
+ private final RabbitMQManagementAPI api;
+
+ @Inject
+ public RabbitMQEventBusDeadLetterQueueHealthCheck(RabbitMQConfiguration configuration, NamingStrategy mailboxEventNamingStrategy) {
+ this.configuration = configuration;
+ this.mailboxEventNamingStrategy = mailboxEventNamingStrategy;
+ this.api = RabbitMQManagementAPI.from(configuration);
+ }
+
+ @Override
+ public ComponentName componentName() {
+ return COMPONENT_NAME;
+ }
+
+ @Override
+ public Mono<Result> check() {
+ return Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), mailboxEventNamingStrategy.deadLetterQueue().getName()).getQueueLength())
+ .mergeWith(Mono.fromCallable(() -> api.queueDetails(configuration.getVhost().orElse(DEFAULT_VHOST), NamingStrategy.JMAP_NAMING_STRATEGY.deadLetterQueue().getName()).getQueueLength()))
+ .reduce(Long::sum)
+ .map(queueSize -> {
+ if (queueSize != 0) {
+ return Result.degraded(COMPONENT_NAME, "RabbitMQ dead letter queue of the event bus contain events. This might indicate transient failure on event processing.");
+ }
+ return Result.healthy(COMPONENT_NAME);
+ })
+ .onErrorResume(e -> Mono.just(Result.unhealthy(COMPONENT_NAME, "Error checking RabbitMQEventBusDeadLetterHealthCheck", e)));
+ }
+}
diff --git a/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java
new file mode 100644
index 0000000000..c5f2978fe1
--- /dev/null
+++ b/event-bus/distributed/src/test/java/org/apache/james/events/RabbitMQEventBusDeadLetterQueueHealthCheckTest.java
@@ -0,0 +1,155 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.events;
+
+import static com.rabbitmq.client.MessageProperties.PERSISTENT_TEXT_PLAIN;
+import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE;
+import static org.apache.james.backends.rabbitmq.Constants.DIRECT_EXCHANGE;
+import static org.apache.james.backends.rabbitmq.Constants.DURABLE;
+import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.EXCHANGE_NAME;
+import static org.apache.james.backends.rabbitmq.RabbitMQFixture.awaitAtMostOneMinute;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.james.backends.rabbitmq.DockerRabbitMQ;
+import org.apache.james.backends.rabbitmq.RabbitMQExtension;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import com.google.common.collect.ImmutableMap;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+
+class RabbitMQEventBusDeadLetterQueueHealthCheckTest {
+ @RegisterExtension
+ RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ()
+ .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG);
+
+ public static final ImmutableMap<String, Object> NO_QUEUE_DECLARE_ARGUMENTS = ImmutableMap.of();
+ public static final NamingStrategy MAILBOX_EVENTS_NAMING_STRATEGY = new NamingStrategy("mailboxEvents");
+ public static final String ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS = "mailboxEventsRoutingKey";
+ public static final String ROUTING_KEY_JMAP_EVENTS_EVENT_BUS = "mailboxEventsRoutingKey";
+
+ private Connection connection;
+ private Channel channel;
+ private RabbitMQEventBusDeadLetterQueueHealthCheck testee;
+
+ @BeforeEach
+ void setup(DockerRabbitMQ rabbitMQ) throws IOException, TimeoutException, URISyntaxException {
+ ConnectionFactory connectionFactory = rabbitMQ.connectionFactory();
+ connectionFactory.setNetworkRecoveryInterval(1000);
+ connection = connectionFactory.newConnection();
+ channel = connection.createChannel();
+ testee = new RabbitMQEventBusDeadLetterQueueHealthCheck(rabbitMQ.getConfiguration(), MAILBOX_EVENTS_NAMING_STRATEGY);
+ }
+
+ @AfterEach
+ void tearDown(DockerRabbitMQ rabbitMQ) throws Exception {
+ closeQuietly(connection, channel);
+ rabbitMQ.reset();
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenRabbitMQIsDown() throws Exception {
+ rabbitMQExtension.getRabbitMQ().stopApp();
+
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnHealthyWhenDeadLetterQueuesAreEmpty() throws Exception {
+ createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+ createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+ assertThat(testee.check().block().isHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnUnhealthyWhenThereIsNoDeadLetterQueue() {
+ assertThat(testee.check().block().isUnHealthy()).isTrue();
+ }
+
+ @Test
+ void healthCheckShouldReturnDegradedWhenMailboxEventBusDeadLetterQueueIsNotEmpty() throws Exception {
+ createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+ createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+ publishAMessage(channel, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+
+ awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+ }
+
+ @Test
+ void healthCheckShouldReturnDegradedWhenJmapEventBusDeadLetterQueueIsNotEmpty() throws Exception {
+ createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+ createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+ publishAMessage(channel, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+ awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+ }
+
+ @Test
+ void healthCheckShouldReturnDegradedWhenDeadLetterQueuesAreNotEmpty() throws Exception {
+ createDeadLetterQueue(channel, MAILBOX_EVENTS_NAMING_STRATEGY, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+ createDeadLetterQueue(channel, NamingStrategy.JMAP_NAMING_STRATEGY, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+ publishAMessage(channel, ROUTING_KEY_MAILBOX_EVENTS_EVENT_BUS);
+ publishAMessage(channel, ROUTING_KEY_JMAP_EVENTS_EVENT_BUS);
+
+ awaitAtMostOneMinute.until(() -> testee.check().block().isDegraded());
+ }
+
+ private void createDeadLetterQueue(Channel channel, NamingStrategy namingStrategy, String routingKey) throws IOException {
+ channel.exchangeDeclare(EXCHANGE_NAME, DIRECT_EXCHANGE, DURABLE);
+ channel.queueDeclare(namingStrategy.deadLetterQueue().getName(), DURABLE, !EXCLUSIVE, AUTO_DELETE, NO_QUEUE_DECLARE_ARGUMENTS).getQueue();
+ channel.queueBind(namingStrategy.deadLetterQueue().getName(), EXCHANGE_NAME, routingKey);
+ }
+
+ private void publishAMessage(Channel channel, String routingKey) throws IOException {
+ AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()
+ .deliveryMode(PERSISTENT_TEXT_PLAIN.getDeliveryMode())
+ .priority(PERSISTENT_TEXT_PLAIN.getPriority())
+ .contentType(PERSISTENT_TEXT_PLAIN.getContentType())
+ .build();
+
+ channel.basicPublish(EXCHANGE_NAME, routingKey, basicProperties, "Hello, world!".getBytes(StandardCharsets.UTF_8));
+ }
+
+ private void closeQuietly(AutoCloseable... closeables) {
+ Arrays.stream(closeables).forEach(this::closeQuietly);
+ }
+
+ private void closeQuietly(AutoCloseable closeable) {
+ try {
+ closeable.close();
+ } catch (Exception e) {
+ //ignore error
+ }
+ }
+}
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
index dbff6cd7a3..25fa79f7e9 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -19,6 +19,8 @@
package org.apache.james.modules.event;
+import static org.apache.james.events.NamingStrategy.JMAP_NAMING_STRATEGY;
+
import javax.inject.Named;
import org.apache.james.backends.rabbitmq.RabbitMQConfiguration;
@@ -30,7 +32,6 @@ import org.apache.james.events.EventBusId;
import org.apache.james.events.EventBusReconnectionHandler;
import org.apache.james.events.EventDeadLetters;
import org.apache.james.events.KeyReconnectionHandler;
-import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
import org.apache.james.events.RetryBackoffConfiguration;
import org.apache.james.events.RoutingKeyConverter;
@@ -52,7 +53,6 @@ import com.google.inject.name.Names;
import reactor.rabbitmq.Sender;
public class JMAPEventBusModule extends AbstractModule {
- public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
@Override
protected void configure() {
diff --git a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
index 16f3196c88..2eb8c655a0 100644
--- a/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
+++ b/server/container/guice/distributed/src/main/java/org/apache/james/modules/event/RabbitMQEventBusModule.java
@@ -20,6 +20,7 @@
package org.apache.james.modules.event;
import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.event.json.MailboxEventSerializer;
import org.apache.james.events.EventBus;
import org.apache.james.events.EventBusId;
@@ -28,6 +29,7 @@ import org.apache.james.events.EventSerializer;
import org.apache.james.events.KeyReconnectionHandler;
import org.apache.james.events.NamingStrategy;
import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RabbitMQEventBusDeadLetterQueueHealthCheck;
import org.apache.james.events.RegistrationKey;
import org.apache.james.events.RetryBackoffConfiguration;
import org.apache.james.mailbox.events.MailboxIdRegistrationKey;
@@ -63,6 +65,9 @@ public class RabbitMQEventBusModule extends AbstractModule {
Multibinder<SimpleConnectionPool.ReconnectionHandler> reconnectionHandlerMultibinder = Multibinder.newSetBinder(binder(), SimpleConnectionPool.ReconnectionHandler.class);
reconnectionHandlerMultibinder.addBinding().to(KeyReconnectionHandler.class);
reconnectionHandlerMultibinder.addBinding().to(EventBusReconnectionHandler.class);
+
+ Multibinder.newSetBinder(binder(), HealthCheck.class)
+ .addBinding().to(RabbitMQEventBusDeadLetterQueueHealthCheck.class);
}
@ProvidesIntoSet
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org