You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/12/01 05:08:54 UTC

[james-project] 01/08: [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 80d12a2edc40596b3dc1c6c2961ac64e443e036a
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 2 23:55:04 2022 +0100

    [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats
---
 .../queue/activemq/ActiveMQBrokerStatistics.java   | 229 +++++++++++++++++++++
 .../james/queue/activemq/ActiveMQHealthCheck.java  |  95 ++++++++-
 .../james/queue/activemq/EmbeddedActiveMQ.java     |   1 +
 .../queue/activemq/ActiveMQHealthCheckTest.java    |   6 +-
 4 files changed, 325 insertions(+), 6 deletions(-)

diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
new file mode 100644
index 0000000000..e64ff380d9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
@@ -0,0 +1,229 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class ActiveMQBrokerStatistics {
+
+    public static final String STATS_QUEUE_NAME = "ActiveMQ.Statistics.Broker";
+
+    private static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
+    private static final String STORE_PERCENT_USAGE = "storePercentUsage";
+    private static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
+
+    private static final String STORE_LIMIT = "storeLimit";
+    private static final String MEMORY_LIMIT = "memoryLimit";
+    private static final String TEMP_LIMIT = "tempLimit";
+
+    private static final String MEMORY_USAGE = "memoryUsage";
+    private static final String STORE_USAGE = "storeUsage";
+    private static final String TEMP_USAGE = "tempUsage";
+
+    private static final String SIZE = "size";
+    private static final String ENQUEUE_COUNT = "enqueueCount";
+    private static final String DEQUEUE_COUNT = "dequeueCount";
+    private static final String INFLIGHT_COUNT = "inflightCount";
+    private static final String PRODUCER_COUNT = "producerCount";
+    private static final String CONSUMER_COUNT = "consumerCount";
+    private static final String EXPIRED_COUNT = "expiredCount";
+    private static final String DISPATCH_COUNT = "dispatchCount";
+    private static final String MESSAGES_CACHED = "messagesCached";
+
+    private static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
+    private static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
+    private static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
+
+    private final AtomicLong lastUpdate = new AtomicLong();
+
+    private final AtomicInteger memoryPercentUsage = new AtomicInteger();
+    private final AtomicInteger storePercentUsage = new AtomicInteger();
+    private final AtomicInteger tempPercentUsage = new AtomicInteger();
+
+    private final AtomicLong memoryLimit = new AtomicLong();
+    private final AtomicLong storeLimit = new AtomicLong();
+    private final AtomicLong tempLimit = new AtomicLong();
+
+    private final AtomicLong memoryUsage = new AtomicLong();
+    private final AtomicLong storeUsage = new AtomicLong();
+    private final AtomicLong tempUsage = new AtomicLong();
+
+    private final AtomicLong size = new AtomicLong();
+    private final AtomicLong enqueueCount = new AtomicLong();
+    private final AtomicLong dequeueCount = new AtomicLong();
+    private final AtomicLong inflightCount = new AtomicLong();
+    private final AtomicLong producerCount = new AtomicLong();
+    private final AtomicLong consumerCount = new AtomicLong();
+    private final AtomicLong expiredCount = new AtomicLong();
+    private final AtomicLong dispatchCount = new AtomicLong();
+    private final AtomicLong messagesCached = new AtomicLong();
+
+    private final AtomicDouble minEnqueueTime = new AtomicDouble();
+    private final AtomicDouble maxEnqueueTime = new AtomicDouble();
+    private final AtomicDouble averageEnqueueTime = new AtomicDouble();
+
+    public ActiveMQBrokerStatistics(GaugeRegistry gaugeRegistry) {
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_PERCENT_USAGE, storePercentUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_PERCENT_USAGE, tempPercentUsage::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_LIMIT, memoryLimit::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_LIMIT, storeLimit::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_LIMIT, tempLimit::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_USAGE, memoryUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_USAGE, storeUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_USAGE, tempUsage::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + SIZE, size::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + ENQUEUE_COUNT, enqueueCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DEQUEUE_COUNT, dequeueCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + INFLIGHT_COUNT, inflightCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + PRODUCER_COUNT, producerCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + CONSUMER_COUNT, consumerCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + EXPIRED_COUNT, expiredCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DISPATCH_COUNT, dispatchCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MESSAGES_CACHED, messagesCached::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MIN_ENQUEUE_TIME, minEnqueueTime::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + ".lastUpdate", lastUpdate::get);
+    }
+
+    public void update(MapMessage msg) throws JMSException {
+        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
+            memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
+        }
+        if (msg.itemExists(STORE_PERCENT_USAGE)) {
+            storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
+        }
+        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
+            tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
+        }
+
+        if (msg.itemExists(MEMORY_LIMIT)) {
+            memoryLimit.set(msg.getLong(MEMORY_LIMIT));
+        }
+        if (msg.itemExists(STORE_LIMIT)) {
+            storeLimit.set(msg.getLong(STORE_LIMIT));
+        }
+        if (msg.itemExists(TEMP_LIMIT)) {
+            tempLimit.set(msg.getLong(TEMP_LIMIT));
+        }
+
+        if (msg.itemExists(MEMORY_USAGE)) {
+            memoryUsage.set(msg.getLong(MEMORY_USAGE));
+        }
+        if (msg.itemExists(STORE_USAGE)) {
+            storeUsage.set(msg.getLong(STORE_USAGE));
+        }
+        if (msg.itemExists(TEMP_USAGE)) {
+            tempUsage.set(msg.getLong(TEMP_USAGE));
+        }
+
+        if (msg.itemExists(SIZE)) {
+            size.set(msg.getLong(SIZE));
+        }
+        if (msg.itemExists(ENQUEUE_COUNT)) {
+            enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
+        }
+        if (msg.itemExists(DEQUEUE_COUNT)) {
+            dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
+        }
+        if (msg.itemExists(INFLIGHT_COUNT)) {
+            inflightCount.set(msg.getLong(INFLIGHT_COUNT));
+        }
+        if (msg.itemExists(PRODUCER_COUNT)) {
+            producerCount.set(msg.getLong(PRODUCER_COUNT));
+        }
+        if (msg.itemExists(CONSUMER_COUNT)) {
+            consumerCount.set(msg.getLong(CONSUMER_COUNT));
+        }
+        if (msg.itemExists(EXPIRED_COUNT)) {
+            expiredCount.set(msg.getLong(EXPIRED_COUNT));
+        }
+        if (msg.itemExists(DISPATCH_COUNT)) {
+            dispatchCount.set(msg.getLong(DISPATCH_COUNT));
+        }
+        if (msg.itemExists(MESSAGES_CACHED)) {
+            messagesCached.set(msg.getLong(MESSAGES_CACHED));
+        }
+
+        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
+            minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
+            maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
+            averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
+        }
+
+        lastUpdate.set(System.currentTimeMillis());
+    }
+
+    public long getLastUpdate() {
+        return lastUpdate.get();
+    }
+
+    /*
+    vm=vm://localhost
+    - memoryUsage=0
+    - storeUsage=3330
+    - tempPercentUsage=0
+    ssl=
+    openwire=tcp://localhost:50059
+    brokerId=ID:bigmac-50057-1253605065511-0:0
+    - consumerCount=2
+    brokerName=localhost
+    - expiredCount=0
+    - dispatchCount=1
+    - maxEnqueueTime=5.0
+    - storePercentUsage=0
+    - dequeueCount=0
+    - inflightCount=1
+    - messagesCached=0
+    - tempLimit=107374182400
+    - averageEnqueueTime=5.0
+    stomp+ssl=
+    - memoryPercentUsage=0
+    - size=10
+    - tempUsage=0
+    - producerCount=1
+    - minEnqueueTime=5.0
+    dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data
+    - enqueueCount=10
+    stomp=
+    - storeLimit=107374182400
+    - memoryLimit=67108864
+     */
+
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
index 9032496e8f..07eb137794 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
@@ -19,14 +19,24 @@
 
 package org.apache.james.queue.activemq;
 
+import java.time.Duration;
+
 import javax.inject.Inject;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +47,14 @@ import reactor.core.scheduler.Schedulers;
 public class ActiveMQHealthCheck implements HealthCheck {
     public static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQHealthCheck.class);
     public static final ComponentName COMPONENT_NAME = new ComponentName("Embedded ActiveMQ");
+    private static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
     private final ConnectionFactory connectionFactory;
+    private final ActiveMQBrokerStatistics brokerStatistics;
 
     @Inject
-    public ActiveMQHealthCheck(ConnectionFactory connectionFactory) {
+    public ActiveMQHealthCheck(ConnectionFactory connectionFactory, GaugeRegistry gaugeRegistry) {
         this.connectionFactory = connectionFactory;
+        this.brokerStatistics = new ActiveMQBrokerStatistics(gaugeRegistry);
     }
 
     @Override
@@ -53,15 +66,87 @@ public class ActiveMQHealthCheck implements HealthCheck {
     public Publisher<Result> check() {
         return Mono.fromCallable(() -> {
             try {
-                Connection connection = connectionFactory.createConnection();
-                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-                session.close();
+                retrieveAndUpdateBrokerStatistics();
                 return Result.healthy(COMPONENT_NAME);
             } catch (Exception e) {
                 LOGGER.warn("{} is unhealthy. {}", COMPONENT_NAME.getName(), e.getMessage());
-                return Result.unhealthy(COMPONENT_NAME, e.getMessage());
+                return Result.unhealthy(COMPONENT_NAME, e.toString(), e);
             }
         }).subscribeOn(Schedulers.boundedElastic());
     }
+
+    private void retrieveAndUpdateBrokerStatistics() throws JMSException {
+        Connection connection = null;
+        Session session = null;
+        TemporaryQueue replyTo = null;
+        MessageConsumer consumer = null;
+        MessageProducer producer = null;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            replyTo = session.createTemporaryQueue();
+            consumer = session.createConsumer(replyTo);
+
+            Queue testQueue = session.createQueue(ActiveMQBrokerStatistics.STATS_QUEUE_NAME);
+            producer = session.createProducer(testQueue);
+            Message msg = session.createMessage();
+            msg.setJMSReplyTo(replyTo);
+            producer.send(msg);
+
+            Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+            if (reply == null) {
+                throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+            } else if (!(reply instanceof MapMessage)) {
+                throw new JMSException("expected MapMessage but got " + reply.getClass());
+            }
+            brokerStatistics.update((MapMessage)reply);
+        } finally {
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing producer", e);
+                }
+            }
+
+            if (consumer != null) {
+                try {
+                    consumer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing consumer", e);
+                }
+            }
+
+            if (replyTo != null) {
+                try {
+                    // we need to delete the temporary queue to be sure we will
+                    // free up memory if thats not done and a pool is used
+                    // its possible that we will register a new mbean in jmx for
+                    // every TemporaryQueue which will never get unregistered
+                    replyTo.delete();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while deleting temporary queue", e);
+                }
+            }
+
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing session", e);
+                }
+            }
+
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing connection", e);
+                }
+            }
+        }
+    }
 }
 
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
index ffe22248f5..b0faadfa53 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
@@ -109,6 +109,7 @@ public class EmbeddedActiveMQ {
         brokerService.setPersistenceAdapter(persistenceAdapter);
         BrokerPlugin[] brokerPlugins = {new StatisticsBrokerPlugin()};
         brokerService.setPlugins(brokerPlugins);
+        brokerService.setEnableStatistics(true);
         String[] transportConnectorsURIs = {BROCKER_URI};
         brokerService.setTransportConnectorURIs(transportConnectorsURIs);
         brokerService.start();
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
index f54851a487..aef9d9a499 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
@@ -26,15 +26,19 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.queue.jms.BrokerExtension;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import reactor.core.publisher.Mono;
 
+@Tag(BrokerExtension.STATISTICS)
 @ExtendWith(BrokerExtension.class)
 class ActiveMQHealthCheckTest {
+
     private ActiveMQHealthCheck testee;
     private BrokerService broker;
 
@@ -46,7 +50,7 @@ class ActiveMQHealthCheckTest {
         prefetchPolicy.setQueuePrefetch(0);
         connectionFactory.setPrefetchPolicy(prefetchPolicy);
 
-        testee = new ActiveMQHealthCheck(connectionFactory);
+        testee = new ActiveMQHealthCheck(connectionFactory, new NoopGaugeRegistry());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org