You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/08/05 01:27:34 UTC

[james-project] branch master updated: JAMES-3794 Implement ActiveMQ HealthCheck (#1105)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d6770bad4 JAMES-3794 Implement ActiveMQ HealthCheck (#1105)
9d6770bad4 is described below

commit 9d6770bad4afe0d504efa99546c98f156cea2a6c
Author: Trần Hồng Quân <55...@users.noreply.github.com>
AuthorDate: Fri Aug 5 08:27:30 2022 +0700

    JAMES-3794 Implement ActiveMQ HealthCheck (#1105)
---
 .../queue/activemq/ActiveMQQueueModule.java        |  5 ++
 .../james/queue/activemq/ActiveMQHealthCheck.java  | 66 ++++++++++++++++++++
 .../queue/activemq/ActiveMQHealthCheckTest.java    | 70 ++++++++++++++++++++++
 3 files changed, 141 insertions(+)

diff --git a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
index 58e02d8961..d9609ae358 100644
--- a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
+++ b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
@@ -23,6 +23,8 @@ import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.queue.activemq.ActiveMQHealthCheck;
 import org.apache.james.queue.activemq.ActiveMQMailQueueFactory;
 import org.apache.james.queue.activemq.EmbeddedActiveMQ;
 import org.apache.james.queue.api.MailQueue;
@@ -33,6 +35,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
+import com.google.inject.multibindings.Multibinder;
 
 public class ActiveMQQueueModule extends AbstractModule {
 
@@ -42,6 +45,8 @@ public class ActiveMQQueueModule extends AbstractModule {
         bind(KahaDBPersistenceAdapter.class).in(Scopes.SINGLETON);
         bind(EmbeddedActiveMQ.class).in(Scopes.SINGLETON);
         bind(ActiveMQMailQueueFactory.class).in(Scopes.SINGLETON);
+
+        Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(ActiveMQHealthCheck.class);
     }
     
     @Provides
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
new file mode 100644
index 0000000000..989ba639f9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import javax.inject.Inject;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+
+import org.apache.james.core.healthcheck.ComponentName;
+import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.core.healthcheck.Result;
+import org.reactivestreams.Publisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import reactor.core.publisher.Mono;
+
+public class ActiveMQHealthCheck implements HealthCheck {
+    public static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQHealthCheck.class);
+    public static final ComponentName COMPONENT_NAME = new ComponentName("Embedded ActiveMQ");
+    private final ConnectionFactory connectionFactory;
+
+    @Inject
+    public ActiveMQHealthCheck(ConnectionFactory connectionFactory) {
+        this.connectionFactory = connectionFactory;
+    }
+
+    @Override
+    public ComponentName componentName() {
+        return COMPONENT_NAME;
+    }
+
+    @Override
+    public Publisher<Result> check() {
+        return Mono.fromCallable(() -> {
+            try {
+                Connection connection = connectionFactory.createConnection();
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                session.close();
+                return Result.healthy(COMPONENT_NAME);
+            } catch (Exception e) {
+                LOGGER.warn("{} is unhealthy. {}", COMPONENT_NAME.getName(), e.getMessage());
+                return Result.unhealthy(COMPONENT_NAME, e.getMessage());
+            }
+        });
+    }
+}
+
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
new file mode 100644
index 0000000000..f54851a487
--- /dev/null
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
@@ -0,0 +1,70 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import static org.apache.james.queue.activemq.ActiveMQHealthCheck.COMPONENT_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.james.core.healthcheck.Result;
+import org.apache.james.queue.jms.BrokerExtension;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import reactor.core.publisher.Mono;
+
+@ExtendWith(BrokerExtension.class)
+class ActiveMQHealthCheckTest {
+    private ActiveMQHealthCheck testee;
+    private BrokerService broker;
+
+    @BeforeEach
+    void setup(BrokerService broker) {
+        this.broker = broker;
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(0);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+
+        testee = new ActiveMQHealthCheck(connectionFactory);
+    }
+
+    @Test
+    void componentNameShouldReturnTheRightValue() {
+        assertThat(testee.componentName().getName())
+            .isEqualTo(COMPONENT_NAME.getName());
+    }
+
+    @Test
+    void checkShouldReturnHealthyWhenActiveMQHealthy() {
+        assertThat(Mono.from(testee.check()).block())
+            .isEqualTo(Result.healthy(COMPONENT_NAME));
+    }
+
+    @Test
+    void checkShouldReturnUnHealthyWhenActiveMQDown() throws Exception {
+        broker.stop();
+        assertThat(Mono.from(testee.check()).block().isUnHealthy()).isTrue();
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org