You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/12/01 05:08:53 UTC

[james-project] branch master updated (e8ff974754 -> a51cbe7214)

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from e8ff974754 JAMES-3756 JMAP Delegation extension specification (#1332)
     new 80d12a2edc [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats
     new cffb52de58 [JAMES-3841] ActiveMQ: restore simple connection health check and move collecting statistics into separate class (i.e. broker stats, per-queue stats) with periodic polling
     new 83b5b660d5 [JAMES-3841] ActiveMQ: fix test for JamesAppSpringMainTest
     new 6ae8df4106 [JAMES-3841] ActiveMQ: add SettableGauge to metrics api and use it in ActiveMQ metric collection
     new ba2a05d016 [JAMES-3841] ActiveMQ: allow configuring metric collection (i.e. conf/activemq.properties)
     new 67f6c610b3 [JAMES-3841] ActiveMQ: add license comment
     new fee5c90338 [JAMES-3841] ActiveMQ: add sample configuration for conf/activemq.properties
     new a51cbe7214 [JAMES-3841] ActiveMQ: add documentation for collecting metrics

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/james/metrics/api/GaugeRegistry.java    |   6 +
 .../james/metrics/api/NoopGaugeRegistry.java       |   5 +
 .../dropwizard/DropWizardGaugeRegistry.java        |   8 +
 .../sample-configuration/activemq.properties       |  18 ++
 server/container/guice/queue/activemq/pom.xml      |   4 +
 .../queue/activemq/ActiveMQQueueModule.java        |  37 +++++
 .../queue/activemq/ActiveMQConfiguration.java      |  34 ++--
 .../james/queue/activemq/ActiveMQHealthCheck.java  |   3 +-
 .../queue/activemq/ActiveMQMailQueueFactory.java   |  12 +-
 .../james/queue/activemq/EmbeddedActiveMQ.java     |   1 +
 .../activemq/metric/ActiveMQMetricCollector.java}  |  17 +-
 .../metric/ActiveMQMetricCollectorImpl.java        | 182 +++++++++++++++++++++
 .../metric/ActiveMQMetricCollectorNoop.java        |  19 ++-
 .../metric/ActiveMQMetricConfiguration.java        | 112 +++++++++++++
 .../queue/activemq/metric/ActiveMQMetrics.java     | 176 ++++++++++++++++++++
 .../META-INF/spring/activemq-queue-context.xml     |   3 +
 .../queue/activemq/ActiveMQHealthCheckTest.java    |   1 +
 .../activemq/ActiveMQMailQueueFactoryTest.java     |   8 +-
 .../metric/ActiveMQMetricCollectorTest.java        | 142 ++++++++++++++++
 .../metric/ActiveMQMetricConfigurationTest.java    |  64 ++++----
 src/site/site.xml                                  |   1 +
 src/site/xdoc/server/config-activemq.xml           |  91 +++++++++++
 src/site/xdoc/server/metrics.xml                   |   1 +
 23 files changed, 871 insertions(+), 74 deletions(-)
 create mode 100644 server/apps/cassandra-app/sample-configuration/activemq.properties
 copy protocols/api/src/main/java/org/apache/james/protocols/api/handler/ProtocolHandler.java => server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java (61%)
 copy server/{task/task-memory/src/main/java/org/apache/james/task/WorkQueue.java => queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollector.java} (81%)
 create mode 100644 server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
 copy mpt/core/src/main/java/org/apache/james/mpt/monitor/NullMonitor.java => server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorNoop.java (78%)
 create mode 100644 server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
 create mode 100644 server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetrics.java
 create mode 100644 server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
 copy mdn/src/test/java/org/apache/james/mdn/fields/OriginalMessageIdTest.java => server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java (50%)
 create mode 100644 src/site/xdoc/server/config-activemq.xml


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/08: [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 80d12a2edc40596b3dc1c6c2961ac64e443e036a
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 2 23:55:04 2022 +0100

    [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats
---
 .../queue/activemq/ActiveMQBrokerStatistics.java   | 229 +++++++++++++++++++++
 .../james/queue/activemq/ActiveMQHealthCheck.java  |  95 ++++++++-
 .../james/queue/activemq/EmbeddedActiveMQ.java     |   1 +
 .../queue/activemq/ActiveMQHealthCheckTest.java    |   6 +-
 4 files changed, 325 insertions(+), 6 deletions(-)

diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
new file mode 100644
index 0000000000..e64ff380d9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
@@ -0,0 +1,229 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class ActiveMQBrokerStatistics {
+
+    public static final String STATS_QUEUE_NAME = "ActiveMQ.Statistics.Broker";
+
+    private static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
+    private static final String STORE_PERCENT_USAGE = "storePercentUsage";
+    private static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
+
+    private static final String STORE_LIMIT = "storeLimit";
+    private static final String MEMORY_LIMIT = "memoryLimit";
+    private static final String TEMP_LIMIT = "tempLimit";
+
+    private static final String MEMORY_USAGE = "memoryUsage";
+    private static final String STORE_USAGE = "storeUsage";
+    private static final String TEMP_USAGE = "tempUsage";
+
+    private static final String SIZE = "size";
+    private static final String ENQUEUE_COUNT = "enqueueCount";
+    private static final String DEQUEUE_COUNT = "dequeueCount";
+    private static final String INFLIGHT_COUNT = "inflightCount";
+    private static final String PRODUCER_COUNT = "producerCount";
+    private static final String CONSUMER_COUNT = "consumerCount";
+    private static final String EXPIRED_COUNT = "expiredCount";
+    private static final String DISPATCH_COUNT = "dispatchCount";
+    private static final String MESSAGES_CACHED = "messagesCached";
+
+    private static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
+    private static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
+    private static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
+
+    private final AtomicLong lastUpdate = new AtomicLong();
+
+    private final AtomicInteger memoryPercentUsage = new AtomicInteger();
+    private final AtomicInteger storePercentUsage = new AtomicInteger();
+    private final AtomicInteger tempPercentUsage = new AtomicInteger();
+
+    private final AtomicLong memoryLimit = new AtomicLong();
+    private final AtomicLong storeLimit = new AtomicLong();
+    private final AtomicLong tempLimit = new AtomicLong();
+
+    private final AtomicLong memoryUsage = new AtomicLong();
+    private final AtomicLong storeUsage = new AtomicLong();
+    private final AtomicLong tempUsage = new AtomicLong();
+
+    private final AtomicLong size = new AtomicLong();
+    private final AtomicLong enqueueCount = new AtomicLong();
+    private final AtomicLong dequeueCount = new AtomicLong();
+    private final AtomicLong inflightCount = new AtomicLong();
+    private final AtomicLong producerCount = new AtomicLong();
+    private final AtomicLong consumerCount = new AtomicLong();
+    private final AtomicLong expiredCount = new AtomicLong();
+    private final AtomicLong dispatchCount = new AtomicLong();
+    private final AtomicLong messagesCached = new AtomicLong();
+
+    private final AtomicDouble minEnqueueTime = new AtomicDouble();
+    private final AtomicDouble maxEnqueueTime = new AtomicDouble();
+    private final AtomicDouble averageEnqueueTime = new AtomicDouble();
+
+    public ActiveMQBrokerStatistics(GaugeRegistry gaugeRegistry) {
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_PERCENT_USAGE, storePercentUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_PERCENT_USAGE, tempPercentUsage::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_LIMIT, memoryLimit::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_LIMIT, storeLimit::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_LIMIT, tempLimit::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_USAGE, memoryUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_USAGE, storeUsage::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_USAGE, tempUsage::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + SIZE, size::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + ENQUEUE_COUNT, enqueueCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DEQUEUE_COUNT, dequeueCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + INFLIGHT_COUNT, inflightCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + PRODUCER_COUNT, producerCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + CONSUMER_COUNT, consumerCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + EXPIRED_COUNT, expiredCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DISPATCH_COUNT, dispatchCount::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MESSAGES_CACHED, messagesCached::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MIN_ENQUEUE_TIME, minEnqueueTime::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
+        gaugeRegistry.register(STATS_QUEUE_NAME + "." + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
+
+        gaugeRegistry.register(STATS_QUEUE_NAME + ".lastUpdate", lastUpdate::get);
+    }
+
+    public void update(MapMessage msg) throws JMSException {
+        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
+            memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
+        }
+        if (msg.itemExists(STORE_PERCENT_USAGE)) {
+            storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
+        }
+        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
+            tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
+        }
+
+        if (msg.itemExists(MEMORY_LIMIT)) {
+            memoryLimit.set(msg.getLong(MEMORY_LIMIT));
+        }
+        if (msg.itemExists(STORE_LIMIT)) {
+            storeLimit.set(msg.getLong(STORE_LIMIT));
+        }
+        if (msg.itemExists(TEMP_LIMIT)) {
+            tempLimit.set(msg.getLong(TEMP_LIMIT));
+        }
+
+        if (msg.itemExists(MEMORY_USAGE)) {
+            memoryUsage.set(msg.getLong(MEMORY_USAGE));
+        }
+        if (msg.itemExists(STORE_USAGE)) {
+            storeUsage.set(msg.getLong(STORE_USAGE));
+        }
+        if (msg.itemExists(TEMP_USAGE)) {
+            tempUsage.set(msg.getLong(TEMP_USAGE));
+        }
+
+        if (msg.itemExists(SIZE)) {
+            size.set(msg.getLong(SIZE));
+        }
+        if (msg.itemExists(ENQUEUE_COUNT)) {
+            enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
+        }
+        if (msg.itemExists(DEQUEUE_COUNT)) {
+            dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
+        }
+        if (msg.itemExists(INFLIGHT_COUNT)) {
+            inflightCount.set(msg.getLong(INFLIGHT_COUNT));
+        }
+        if (msg.itemExists(PRODUCER_COUNT)) {
+            producerCount.set(msg.getLong(PRODUCER_COUNT));
+        }
+        if (msg.itemExists(CONSUMER_COUNT)) {
+            consumerCount.set(msg.getLong(CONSUMER_COUNT));
+        }
+        if (msg.itemExists(EXPIRED_COUNT)) {
+            expiredCount.set(msg.getLong(EXPIRED_COUNT));
+        }
+        if (msg.itemExists(DISPATCH_COUNT)) {
+            dispatchCount.set(msg.getLong(DISPATCH_COUNT));
+        }
+        if (msg.itemExists(MESSAGES_CACHED)) {
+            messagesCached.set(msg.getLong(MESSAGES_CACHED));
+        }
+
+        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
+            minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
+            maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
+            averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
+        }
+
+        lastUpdate.set(System.currentTimeMillis());
+    }
+
+    public long getLastUpdate() {
+        return lastUpdate.get();
+    }
+
+    /*
+    vm=vm://localhost
+    - memoryUsage=0
+    - storeUsage=3330
+    - tempPercentUsage=0
+    ssl=
+    openwire=tcp://localhost:50059
+    brokerId=ID:bigmac-50057-1253605065511-0:0
+    - consumerCount=2
+    brokerName=localhost
+    - expiredCount=0
+    - dispatchCount=1
+    - maxEnqueueTime=5.0
+    - storePercentUsage=0
+    - dequeueCount=0
+    - inflightCount=1
+    - messagesCached=0
+    - tempLimit=107374182400
+    - averageEnqueueTime=5.0
+    stomp+ssl=
+    - memoryPercentUsage=0
+    - size=10
+    - tempUsage=0
+    - producerCount=1
+    - minEnqueueTime=5.0
+    dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data
+    - enqueueCount=10
+    stomp=
+    - storeLimit=107374182400
+    - memoryLimit=67108864
+     */
+
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
index 9032496e8f..07eb137794 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
@@ -19,14 +19,24 @@
 
 package org.apache.james.queue.activemq;
 
+import java.time.Duration;
+
 import javax.inject.Inject;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TemporaryQueue;
 
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.GaugeRegistry;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,11 +47,14 @@ import reactor.core.scheduler.Schedulers;
 public class ActiveMQHealthCheck implements HealthCheck {
     public static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQHealthCheck.class);
     public static final ComponentName COMPONENT_NAME = new ComponentName("Embedded ActiveMQ");
+    private static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
     private final ConnectionFactory connectionFactory;
+    private final ActiveMQBrokerStatistics brokerStatistics;
 
     @Inject
-    public ActiveMQHealthCheck(ConnectionFactory connectionFactory) {
+    public ActiveMQHealthCheck(ConnectionFactory connectionFactory, GaugeRegistry gaugeRegistry) {
         this.connectionFactory = connectionFactory;
+        this.brokerStatistics = new ActiveMQBrokerStatistics(gaugeRegistry);
     }
 
     @Override
@@ -53,15 +66,87 @@ public class ActiveMQHealthCheck implements HealthCheck {
     public Publisher<Result> check() {
         return Mono.fromCallable(() -> {
             try {
-                Connection connection = connectionFactory.createConnection();
-                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
-                session.close();
+                retrieveAndUpdateBrokerStatistics();
                 return Result.healthy(COMPONENT_NAME);
             } catch (Exception e) {
                 LOGGER.warn("{} is unhealthy. {}", COMPONENT_NAME.getName(), e.getMessage());
-                return Result.unhealthy(COMPONENT_NAME, e.getMessage());
+                return Result.unhealthy(COMPONENT_NAME, e.toString(), e);
             }
         }).subscribeOn(Schedulers.boundedElastic());
     }
+
+    private void retrieveAndUpdateBrokerStatistics() throws JMSException {
+        Connection connection = null;
+        Session session = null;
+        TemporaryQueue replyTo = null;
+        MessageConsumer consumer = null;
+        MessageProducer producer = null;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            replyTo = session.createTemporaryQueue();
+            consumer = session.createConsumer(replyTo);
+
+            Queue testQueue = session.createQueue(ActiveMQBrokerStatistics.STATS_QUEUE_NAME);
+            producer = session.createProducer(testQueue);
+            Message msg = session.createMessage();
+            msg.setJMSReplyTo(replyTo);
+            producer.send(msg);
+
+            Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+            if (reply == null) {
+                throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+            } else if (!(reply instanceof MapMessage)) {
+                throw new JMSException("expected MapMessage but got " + reply.getClass());
+            }
+            brokerStatistics.update((MapMessage)reply);
+        } finally {
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing producer", e);
+                }
+            }
+
+            if (consumer != null) {
+                try {
+                    consumer.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing consumer", e);
+                }
+            }
+
+            if (replyTo != null) {
+                try {
+                    // we need to delete the temporary queue to be sure we will
+                    // free up memory if thats not done and a pool is used
+                    // its possible that we will register a new mbean in jmx for
+                    // every TemporaryQueue which will never get unregistered
+                    replyTo.delete();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while deleting temporary queue", e);
+                }
+            }
+
+            if (session != null) {
+                try {
+                    session.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing session", e);
+                }
+            }
+
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (JMSException e) {
+                    LOGGER.warn("error while closing connection", e);
+                }
+            }
+        }
+    }
 }
 
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
index ffe22248f5..b0faadfa53 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
@@ -109,6 +109,7 @@ public class EmbeddedActiveMQ {
         brokerService.setPersistenceAdapter(persistenceAdapter);
         BrokerPlugin[] brokerPlugins = {new StatisticsBrokerPlugin()};
         brokerService.setPlugins(brokerPlugins);
+        brokerService.setEnableStatistics(true);
         String[] transportConnectorsURIs = {BROCKER_URI};
         brokerService.setTransportConnectorURIs(transportConnectorsURIs);
         brokerService.start();
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
index f54851a487..aef9d9a499 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
@@ -26,15 +26,19 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.queue.jms.BrokerExtension;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import reactor.core.publisher.Mono;
 
+@Tag(BrokerExtension.STATISTICS)
 @ExtendWith(BrokerExtension.class)
 class ActiveMQHealthCheckTest {
+
     private ActiveMQHealthCheck testee;
     private BrokerService broker;
 
@@ -46,7 +50,7 @@ class ActiveMQHealthCheckTest {
         prefetchPolicy.setQueuePrefetch(0);
         connectionFactory.setPrefetchPolicy(prefetchPolicy);
 
-        testee = new ActiveMQHealthCheck(connectionFactory);
+        testee = new ActiveMQHealthCheck(connectionFactory, new NoopGaugeRegistry());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 06/08: [JAMES-3841] ActiveMQ: add license comment

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 67f6c610b3b2e5c7dd91b79f4bb08e5d917c59d8
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 30 07:46:28 2022 +0100

    [JAMES-3841] ActiveMQ: add license comment
---
 .../james/queue/activemq/ActiveMQConfiguration.java    | 18 ++++++++++++++++++
 .../activemq/metric/ActiveMQMetricConfiguration.java   | 18 ++++++++++++++++++
 .../metric/ActiveMQMetricConfigurationTest.java        | 18 ++++++++++++++++++
 3 files changed, 54 insertions(+)

diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
index f1b47780eb..0f1734e4cf 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
@@ -1,3 +1,21 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
 package org.apache.james.queue.activemq;
 
 import org.apache.commons.configuration2.BaseConfiguration;
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
index 60381cc4b9..ecc3536329 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
@@ -1,3 +1,21 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
 package org.apache.james.queue.activemq.metric;
 
 import java.time.Duration;
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
index 7c3e31448d..1e1f41aaa2 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
@@ -1,3 +1,21 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
 package org.apache.james.queue.activemq.metric;
 
 import java.time.Duration;


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/08: [JAMES-3841] ActiveMQ: allow configuring metric collection (i.e. conf/activemq.properties)

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ba2a05d016cfac4a7154bd0b11830896befc42c0
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Tue Nov 29 12:53:00 2022 +0100

    [JAMES-3841] ActiveMQ: allow configuring metric collection (i.e. conf/activemq.properties)
---
 .../queue/activemq/ActiveMQQueueModule.java        | 23 ++++++
 .../queue/activemq/ActiveMQConfiguration.java      | 26 ++++++
 .../metric/ActiveMQMetricCollectorImpl.java        | 31 ++++---
 .../metric/ActiveMQMetricConfiguration.java        | 94 ++++++++++++++++++++++
 .../metric/ActiveMQMetricCollectorTest.java        | 14 ++--
 .../metric/ActiveMQMetricConfigurationTest.java    | 72 +++++++++++++++++
 6 files changed, 241 insertions(+), 19 deletions(-)

diff --git a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
index da56c98306..2113c3cf47 100644
--- a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
+++ b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
@@ -19,11 +19,16 @@
 
 package org.apache.james.modules.queue.activemq;
 
+import java.io.FileNotFoundException;
+
 import javax.jms.ConnectionFactory;
 
 import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.core.healthcheck.HealthCheck;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
 import org.apache.james.queue.activemq.ActiveMQHealthCheck;
 import org.apache.james.queue.activemq.ActiveMQMailQueueFactory;
 import org.apache.james.queue.activemq.EmbeddedActiveMQ;
@@ -34,6 +39,9 @@ import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
+import org.apache.james.utils.PropertiesProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
@@ -44,6 +52,9 @@ import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class ActiveMQQueueModule extends AbstractModule {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQQueueModule.class);
+    private static final String FILENAME = "activemq";
+
     @Override
     protected void configure() {
         bind(PersistenceAdapter.class).to(KahaDBPersistenceAdapter.class);
@@ -82,6 +93,18 @@ public class ActiveMQQueueModule extends AbstractModule {
         return queueFactory;
     }
 
+    @Singleton
+    @Provides
+    ActiveMQConfiguration activeMQConfiguration(PropertiesProvider propertiesProvider) throws ConfigurationException {
+        try {
+            Configuration configuration = propertiesProvider.getConfigurations(FILENAME);
+            return ActiveMQConfiguration.from(configuration);
+        } catch (FileNotFoundException e) {
+            LOGGER.warn("Could not find {} configuration file, using default configuration", FILENAME);
+            return ActiveMQConfiguration.getDefault();
+        }
+    }
+
     @ProvidesIntoSet
     InitializationOperation configureMetricCollector(ActiveMQMetricCollector metricCollector) {
         return InitilizationOperationBuilder
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
new file mode 100644
index 0000000000..f1b47780eb
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQConfiguration.java
@@ -0,0 +1,26 @@
+package org.apache.james.queue.activemq;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.apache.commons.configuration2.Configuration;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricConfiguration;
+
+public class ActiveMQConfiguration {
+
+    private final ActiveMQMetricConfiguration metricConfiguration;
+
+    public static ActiveMQConfiguration getDefault() {
+        return from(new BaseConfiguration());
+    }
+
+    public static ActiveMQConfiguration from(Configuration configuration) {
+        return new ActiveMQConfiguration(ActiveMQMetricConfiguration.from(configuration));
+    }
+
+    private ActiveMQConfiguration(ActiveMQMetricConfiguration metricConfiguration) {
+        this.metricConfiguration = metricConfiguration;
+    }
+
+    public ActiveMQMetricConfiguration getMetricConfiguration() {
+        return metricConfiguration;
+    }
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
index d4ed4856b0..fc002990c9 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.queue.activemq.metric;
 
-import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -38,6 +37,7 @@ import javax.jms.TemporaryQueue;
 
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
 import org.apache.james.queue.api.MailQueueName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,11 +53,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMetricCollectorImpl.class);
 
-    public static final Duration REFRESH_DELAY = Duration.ofSeconds(2);
-    public static final Duration REFRESH_INTERVAL = Duration.ofSeconds(5);
-    public static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
-    public static final Duration REFRESH_TIMEOUT = RECEIVE_TIMEOUT.multipliedBy(2);
-
+    private final ActiveMQMetricConfiguration config;
     private final ConnectionFactory connectionFactory;
     private final MetricFactory metricFactory;
     private final GaugeRegistry gaugeRegistry;
@@ -67,7 +63,8 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
     private Disposable disposable;
 
     @Inject
-    public ActiveMQMetricCollectorImpl(ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
+    public ActiveMQMetricCollectorImpl(ActiveMQConfiguration activeMQConfiguration, ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
+        this.config = activeMQConfiguration.getMetricConfiguration();
         this.connectionFactory = connectionFactory;
         this.metricFactory = metricFactory;
         this.gaugeRegistry = gaugeRegistry;
@@ -84,7 +81,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
     }
 
     private void collectStatistics(ActiveMQMetrics statistics) {
-        if (!registeredStatistics.containsKey(statistics.getName())) {
+        if (config.isEnabled() && !registeredStatistics.containsKey(statistics.getName())) {
             LOGGER.info("collecting statistics for {}", statistics.getName());
             registeredStatistics.put(statistics.getName(), statistics);
         }
@@ -92,13 +89,20 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
 
     @Override
     public void start() {
+        if (!config.isEnabled()) {
+            LOGGER.info("collecting statistics disabled");
+            return;
+        }
+
         collectBrokerStatistics();
 
-        LOGGER.info("start delay={} interval={}", REFRESH_DELAY, REFRESH_INTERVAL);
-        disposable = Flux.interval(REFRESH_DELAY, REFRESH_INTERVAL)
+        LOGGER.info("start delay={} interval={} timeout={} aqmp_timeout={}",
+            config.getStartDelay(), config.getInterval(), config.getTimeout(), config.getAqmpTimeout());
+
+        disposable = Flux.interval(config.getStartDelay(), config.getInterval())
             .flatMap(any -> Flux.fromStream(() -> registeredStatistics.values().stream())
                 .flatMap((s) -> {
-                    Mono<Void> task = Mono.fromCallable(() -> fetchAndUpdate(s)).timeout(REFRESH_TIMEOUT);
+                    Mono<Void> task = Mono.fromCallable(() -> fetchAndUpdate(s)).timeout(config.getTimeout());
                     return metricFactory.decoratePublisherWithTimerMetric(s.getName() + "._time", task);
                 })
             )
@@ -139,9 +143,10 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
             msg.setJMSReplyTo(replyTo);
             producer.send(msg);
 
-            Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+            long timeoutMs = config.getAqmpTimeout().toMillis();
+            Message reply = consumer.receive(timeoutMs);
             if (reply == null) {
-                throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+                throw new JMSException("no message received, timed out after " + timeoutMs + " ms");
             } else if (!(reply instanceof MapMessage)) {
                 throw new JMSException("expected MapMessage but got " + reply.getClass());
             }
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
new file mode 100644
index 0000000000..60381cc4b9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfiguration.java
@@ -0,0 +1,94 @@
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import java.util.Optional;
+
+import org.apache.commons.configuration2.Configuration;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.james.util.DurationParser;
+
+import com.google.common.base.Preconditions;
+
+public class ActiveMQMetricConfiguration {
+    private static final String ENABLED = "metrics.enabled";
+    private static final String START_DELAY = "metrics.start_delay";
+    private static final String INTERVAL = "metrics.interval";
+    private static final String TIMEOUT  = "metrics.timeout";
+    private static final String AQMP_TIMEOUT  = "metrics.aqmp_timeout";
+
+    private static final Duration MINIMAL_START_DELAY = Duration.ofSeconds(1);
+    private static final Duration MINIMAL_INTERVAL = Duration.ofSeconds(5);
+    private static final Duration MINIMAL_TIMEOUT = Duration.ofSeconds(2);
+    private static final Duration MINIMAL_AQMP_TIMEOUT = Duration.ofSeconds(1);
+
+    private final boolean enabled;
+    private final Duration startDelay;
+    private final Duration interval;
+    private final Duration timeout;
+    private final Duration aqmpTimeout;
+
+    public static ActiveMQMetricConfiguration from(Configuration configuration) {
+        return new ActiveMQMetricConfiguration(
+            configuration.getBoolean(ENABLED, true),
+            getDurationFromConfiguration(configuration, START_DELAY).orElse(MINIMAL_START_DELAY),
+            getDurationFromConfiguration(configuration, INTERVAL).orElse(MINIMAL_INTERVAL),
+            getDurationFromConfiguration(configuration, TIMEOUT).orElse(MINIMAL_TIMEOUT),
+            getDurationFromConfiguration(configuration, AQMP_TIMEOUT).orElse(MINIMAL_AQMP_TIMEOUT)
+        );
+    }
+
+    public ActiveMQMetricConfiguration(boolean enabled, Duration startDelay, Duration interval,
+        Duration timeout, Duration aqmpTimeout) {
+        this.enabled = enabled;
+        this.startDelay = startDelay;
+        this.interval = interval;
+        this.timeout = timeout;
+        this.aqmpTimeout = aqmpTimeout;
+        checkConfiguration();
+    }
+
+    private void checkConfiguration() {
+        Preconditions.checkArgument(startDelay.compareTo(MINIMAL_START_DELAY) >= 0,
+            "'%s' must be equal or greater than %d ms",
+            START_DELAY, MINIMAL_START_DELAY.toMillis());
+        Preconditions.checkArgument(interval.compareTo(MINIMAL_INTERVAL) >= 0,
+            "'%s' must be equal or greater than %d ms",
+            INTERVAL, MINIMAL_INTERVAL.toMillis());
+        Preconditions.checkArgument(timeout.compareTo(MINIMAL_TIMEOUT) >= 0,
+            "'%s' must be equal or greater than %d ms",
+            TIMEOUT, MINIMAL_TIMEOUT.toMillis());
+        Preconditions.checkArgument(aqmpTimeout.compareTo(MINIMAL_AQMP_TIMEOUT) >= 0,
+            "'%s' must be equal or greater than %d ms",
+            AQMP_TIMEOUT, MINIMAL_AQMP_TIMEOUT.toMillis());
+
+        Preconditions.checkArgument(interval.compareTo(timeout) > 0,
+            "'%s' must be less than '%s'", TIMEOUT, INTERVAL);
+        Preconditions.checkArgument(timeout.compareTo(aqmpTimeout) > 0,
+            "'%s' must be less than '%s'", AQMP_TIMEOUT, TIMEOUT);
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public Duration getStartDelay() {
+        return startDelay;
+    }
+
+    public Duration getInterval() {
+        return interval;
+    }
+
+    public Duration getTimeout() {
+        return timeout;
+    }
+
+    public Duration getAqmpTimeout() {
+        return aqmpTimeout;
+    }
+
+    private static Optional<Duration> getDurationFromConfiguration(Configuration configuration, String key) {
+        return StringUtils.isEmpty(configuration.getString(key))
+            ? Optional.empty() : Optional.of(DurationParser.parse(configuration.getString(key)));
+    }
+}
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
index d1d02dea2e..5b3d9d3312 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
@@ -22,7 +22,6 @@ package org.apache.james.queue.activemq.metric;
 import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Supplier;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -37,6 +36,7 @@ import org.apache.james.metrics.api.Gauge;
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.apache.james.queue.activemq.ActiveMQConfiguration;
 import org.apache.james.queue.api.MailQueueName;
 import org.apache.james.queue.jms.BrokerExtension;
 import org.junit.jupiter.api.BeforeAll;
@@ -49,6 +49,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
 class ActiveMQMetricCollectorTest {
 
     private static ActiveMQConnectionFactory connectionFactory;
+    private static final ActiveMQConfiguration EMPTY_CONFIGURATION = ActiveMQConfiguration.getDefault();
 
     @BeforeAll
     static void setup(BrokerService broker) {
@@ -61,7 +62,7 @@ class ActiveMQMetricCollectorTest {
     @Test
     void shouldFailToFetchAndUpdateStatisticsForUnknownQueue() {
         SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
-        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
         ActiveMQMetrics queueStatistics = ActiveMQMetrics.forQueue("UNKNOWN", gaugeRegistry);
 
         assertThatThrownBy(() -> testee.fetchAndUpdate(queueStatistics))
@@ -73,7 +74,7 @@ class ActiveMQMetricCollectorTest {
     @Test
     void shouldFetchAndUpdateBrokerStatistics() throws Exception {
         SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
-        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
         ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
 
         long notBefore = System.currentTimeMillis();
@@ -86,7 +87,7 @@ class ActiveMQMetricCollectorTest {
     @Test
     void shouldFetchAndUpdateBrokerStatisticsInGaugeRegistry() throws Exception {
         SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
-        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
         ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
 
         testee.fetchAndUpdate(brokerStatistics);
@@ -100,12 +101,13 @@ class ActiveMQMetricCollectorTest {
     void hasExecutionTimeMetrics() {
         RecordingMetricFactory metricFactory = new RecordingMetricFactory();
         NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
-        ActiveMQMetricCollector testee = new ActiveMQMetricCollectorImpl(connectionFactory, metricFactory, gaugeRegistry);
+        ActiveMQMetricCollector testee = new ActiveMQMetricCollectorImpl(EMPTY_CONFIGURATION, connectionFactory, metricFactory, gaugeRegistry);
         testee.start();
         testee.collectBrokerStatistics();
         testee.collectQueueStatistics(MailQueueName.of("UNKNOWN"));
 
-        Integer executionTimeCount = Flux.interval(ActiveMQMetricCollectorImpl.REFRESH_DELAY, Duration.ofSeconds(1))
+        Duration startDelay = EMPTY_CONFIGURATION.getMetricConfiguration().getStartDelay();
+        Integer executionTimeCount = Flux.interval(startDelay, Duration.ofSeconds(1))
             .take(3,true)
             .flatMap(n -> Mono.fromCallable(() -> metricFactory.executionTimesForPrefixName("ActiveMQ.").size()))
             .blockLast();
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
new file mode 100644
index 0000000000..7c3e31448d
--- /dev/null
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricConfigurationTest.java
@@ -0,0 +1,72 @@
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.apache.commons.configuration2.BaseConfiguration;
+import org.junit.jupiter.api.Test;
+
+public class ActiveMQMetricConfigurationTest {
+
+    @Test
+    void shouldUseDefaultForEmptyConfiguration() {
+        assertThat(ActiveMQMetricConfiguration.from(new BaseConfiguration()))
+            .isNotNull();
+    }
+
+    @Test
+    void shouldNotFailForValidConfiguration() {
+        assertThat(getSampleConfig(1,10,4,3))
+            .isNotNull();
+    }
+
+    @Test
+    void shouldThrowWhenStartDelayIsLessThanMinimal() {
+        assertThatThrownBy(() -> getSampleConfig(0,10,3,3))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenIntervalIsLessThanMinimal() {
+        assertThatThrownBy(() -> getSampleConfig(1,1,3,3))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenTimeoutIsLessThanMinimal() {
+        assertThatThrownBy(() -> getSampleConfig(1,10,1,3))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenAqmpTimeoutIsLessThanMinimal() {
+        assertThatThrownBy(() -> getSampleConfig(1,10,3,0))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenIntervalIsLessThanTimeout() {
+        assertThatThrownBy(() -> getSampleConfig(1,5,10,2))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenTimeoutIsLessThanAqmpTimeout() {
+        assertThatThrownBy(() -> getSampleConfig(1,10,3,5))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void shouldThrowWhenIntervalIsLessThanAqmpTimeout() {
+        assertThatThrownBy(() -> getSampleConfig(1,5,10,9))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    private ActiveMQMetricConfiguration getSampleConfig(int startDelaySec, int intervalSec, int timeoutSec, int aqmpTimeoutSec) {
+        return new ActiveMQMetricConfiguration(true,
+            Duration.ofSeconds(startDelaySec), Duration.ofSeconds(intervalSec),
+            Duration.ofSeconds(timeoutSec), Duration.ofSeconds(aqmpTimeoutSec));
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 08/08: [JAMES-3841] ActiveMQ: add documentation for collecting metrics

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a51cbe72147a76bd9f2af4f8e827bb4a13aa1fcc
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 30 09:10:17 2022 +0100

    [JAMES-3841] ActiveMQ: add documentation for collecting metrics
---
 src/site/site.xml                        |  1 +
 src/site/xdoc/server/config-activemq.xml | 91 ++++++++++++++++++++++++++++++++
 src/site/xdoc/server/metrics.xml         |  1 +
 3 files changed, 93 insertions(+)

diff --git a/src/site/site.xml b/src/site/site.xml
index 255feb63e3..4e06cd0612 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -110,6 +110,7 @@
 
                     </item>
                     <item name="4. Configure James" href="/server/config.html" collapse="true" >
+                        <item name="ActiveMQ" href="/server/config-activemq.html" />
                         <item name="Additional mailbox listeners" href="/server/config-listeners.html" />
                         <item name="Anti Spam" href="/server/config-antispam.html" />
                         <item name="Blob Export" href="/server/config-blob-export.html" />
diff --git a/src/site/xdoc/server/config-activemq.xml b/src/site/xdoc/server/config-activemq.xml
new file mode 100644
index 0000000000..dfa366bc56
--- /dev/null
+++ b/src/site/xdoc/server/config-activemq.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<document>
+
+  <properties>
+    <title>Apache James Server 3 - ActiveMQ Configuration</title>
+  </properties>
+
+  <body>
+    <section name="ActiveMQ Configuration">
+
+      <p>Consult <a href="https://github.com/apache/james-project/blob/master/server/apps/cassandra-app/sample-configuration/activemq.properties">activemq.properties</a> in GIT to get some examples and hints.</p>
+
+      <p>
+        Use this configuration to configure ActiveMQ AQMP. It is only applicable with Guice products.
+      </p>
+
+    </section>
+
+
+    <section name="ActiveMQ Metrics Configuration">
+      <p>
+        ActiveMQ offers some metrics that are available via statistics plugin
+        (see https://activemq.apache.org/statisticsplugin.html):
+      </p>
+      <ul>
+        <li>
+          Broker statistics: metrics about the broker itself such as <em>memoryUsage</em>,
+          <em>maxEnqueueTime</em>, <em>consumerCount</em> etc. Those metrics can be used for
+          monitoring and alerting (e.g. Prometheus).
+        </li>
+        <li>
+          Queue statistics: For each created queue statistics will be collected, too. Usually,
+          the queues <em>spool</em> and <em>outgoing</em> are available, but are not limited to.
+          Having multiple RemoteDelivery mailets in place, then multiple <em>outgoing-*</em> queues
+          can be available.
+        </li>
+      </ul>
+      <p>
+        Basically, the metric collector sends an AQMP message to ActiveMQ requesting the statistics
+        mentioned above. ActiveMQ responds with a message containing those statistics.
+      </p>
+
+      <p>
+        Metrics Configuration
+      </p>
+      <dl>
+        <dt><strong>metrics.enabled</strong></dt>
+        <dd>Enable collecting metrics (default: true)</dd>
+
+        <dt><strong>metrics.start_delay (default: 1s, minimum: 1s)</strong></dt>
+        <dd>Add a delay before starting collecting metrics on James' startup. You may increase this
+          value if the embedded ActiveMQ takes long to start.</dd>
+
+        <dt><strong>metrics.interval (default: 5s, minimum: 5s)</strong></dt>
+        <dd>Define the interval how often the metrics should be collected.</dd>
+
+        <dt><strong>metrics.timeout (default: 2s, minimum: 2s)</strong></dt>
+        <dd>Timeout used by the collector to wait for an ActiveMQ response to arrive. This timeout is
+        useful if the AQMP timeout (i.e. receive timeout) is not considered (see below).</dd>
+
+        <dt><strong>metrics.aqmp_timeout (default: 1s, minimum: 1s)</strong></dt>
+        <dd>AQMP timeout (i.e. receive timeout) used by the collector when sending an AQMP message.
+          If the prefetch policy (ActiveMQ connection option) is zero, then the receive timeout is
+          indefinite. Thus, the option <em>metrics.timeout</em> helps to have an additional timeout
+          in place.
+          <br />
+          When James is not under heavy requesting and receiving statistics takes 50-80ms (mean).
+        </dd>
+      </dl>
+    </section>
+
+  </body>
+</document>
\ No newline at end of file
diff --git a/src/site/xdoc/server/metrics.xml b/src/site/xdoc/server/metrics.xml
index 42f378ea66..a37e501bc5 100644
--- a/src/site/xdoc/server/metrics.xml
+++ b/src/site/xdoc/server/metrics.xml
@@ -82,6 +82,7 @@
                     <li>Per mailet and per matcher Response time percentiles</li>
                     <li>Diverse Response time percentiles, counts and rates for DNS</li>
                     <li>Cassandra Java driver metrics</li>
+                    <li>ActiveMQ broker/queue statistics</li>
                     <li>Tika HTTP client statistics</li>
                     <li>SpamAssassin TCP client statistics</li>
                     <li>Mailbox listeners statistics time percentiles</li>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/08: [JAMES-3841] ActiveMQ: add SettableGauge to metrics api and use it in ActiveMQ metric collection

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6ae8df410641c34d0851394fde08ec8cefdd6bb7
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Tue Nov 29 01:02:05 2022 +0100

    [JAMES-3841] ActiveMQ: add SettableGauge to metrics api and use it in ActiveMQ metric collection
---
 .../apache/james/metrics/api/GaugeRegistry.java    |   6 +
 .../james/metrics/api/NoopGaugeRegistry.java       |   5 +
 .../dropwizard/DropWizardGaugeRegistry.java        |   8 +
 .../activemq/metric/ActiveMQBrokerStatistics.java  | 106 -------------
 .../metric/ActiveMQMetricCollectorImpl.java        |  13 +-
 .../queue/activemq/metric/ActiveMQMetrics.java     | 176 +++++++++++++++++++++
 .../activemq/metric/ActiveMQQueueStatistics.java   | 155 ------------------
 .../queue/activemq/metric/ActiveMQStatistics.java  |  34 ----
 .../metric/ActiveMQMetricCollectorTest.java        |  43 +++--
 9 files changed, 227 insertions(+), 319 deletions(-)

diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/GaugeRegistry.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/GaugeRegistry.java
index d35062749f..6d2c6d3c21 100644
--- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/GaugeRegistry.java
+++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/GaugeRegistry.java
@@ -20,5 +20,11 @@
 package org.apache.james.metrics.api;
 
 public interface GaugeRegistry {
+    interface SettableGauge<T> {
+        void setValue(T t);
+    }
+
     <T> GaugeRegistry register(String name, Gauge<T> gauge);
+
+    <T> SettableGauge<T> settableGauge(String name);
 }
diff --git a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopGaugeRegistry.java b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopGaugeRegistry.java
index 80fe43badc..07cb98b167 100644
--- a/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopGaugeRegistry.java
+++ b/metrics/metrics-api/src/main/java/org/apache/james/metrics/api/NoopGaugeRegistry.java
@@ -24,4 +24,9 @@ public class NoopGaugeRegistry implements GaugeRegistry {
     public <T> GaugeRegistry register(String name, Gauge<T> gauge) {
         return this;
     }
+
+    @Override
+    public <T> SettableGauge<T> settableGauge(String name) {
+        return t -> { };
+    }
 }
diff --git a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardGaugeRegistry.java b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardGaugeRegistry.java
index f79e2832ee..7f858a4818 100644
--- a/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardGaugeRegistry.java
+++ b/metrics/metrics-dropwizard/src/main/java/org/apache/james/metrics/dropwizard/DropWizardGaugeRegistry.java
@@ -25,6 +25,7 @@ import javax.inject.Inject;
 import org.apache.james.metrics.api.Gauge;
 import org.apache.james.metrics.api.GaugeRegistry;
 
+import com.codahale.metrics.DefaultSettableGauge;
 import com.codahale.metrics.MetricRegistry;
 
 public class DropWizardGaugeRegistry implements GaugeRegistry {
@@ -45,4 +46,11 @@ public class DropWizardGaugeRegistry implements GaugeRegistry {
     public void shutDown() {
         metricRegistry.getGauges().keySet().forEach(metricRegistry::remove);
     }
+
+    @Override
+    public <T> SettableGauge<T> settableGauge(String name) {
+        DefaultSettableGauge<T> gauge = new DefaultSettableGauge<>();
+        metricRegistry.register(name, gauge);
+        return gauge::setValue;
+    }
 }
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java
deleted file mode 100644
index e7ae8acbd3..0000000000
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.activemq.metric;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-
-import org.apache.james.metrics.api.GaugeRegistry;
-
-public class ActiveMQBrokerStatistics extends ActiveMQQueueStatistics {
-    public static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
-    public static final String STORE_PERCENT_USAGE = "storePercentUsage";
-    public static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
-
-    public static final String STORE_LIMIT = "storeLimit";
-    public static final String TEMP_LIMIT = "tempLimit";
-
-    public static final String MEMORY_USAGE = "memoryUsage";
-    public static final String STORE_USAGE = "storeUsage";
-    public static final String TEMP_USAGE = "tempUsage";
-
-    private final AtomicInteger memoryPercentUsage = new AtomicInteger();
-    private final AtomicInteger storePercentUsage = new AtomicInteger();
-    private final AtomicInteger tempPercentUsage = new AtomicInteger();
-
-    private final AtomicLong storeLimit = new AtomicLong();
-    private final AtomicLong tempLimit = new AtomicLong();
-
-    private final AtomicLong memoryUsage = new AtomicLong();
-    private final AtomicLong storeUsage = new AtomicLong();
-    private final AtomicLong tempUsage = new AtomicLong();
-
-    public ActiveMQBrokerStatistics() {
-        super("ActiveMQ.Statistics.Broker");
-    }
-
-    @Override
-    public void registerMetrics(GaugeRegistry gaugeRegistry) {
-        super.registerMetrics(gaugeRegistry);
-
-        String prefix = getName() + ".";
-        gaugeRegistry.register(prefix + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
-        gaugeRegistry.register(prefix + STORE_PERCENT_USAGE, storePercentUsage::get);
-        gaugeRegistry.register(prefix + TEMP_PERCENT_USAGE, tempPercentUsage::get);
-
-        gaugeRegistry.register(prefix + STORE_LIMIT, storeLimit::get);
-        gaugeRegistry.register(prefix + TEMP_LIMIT, tempLimit::get);
-
-        gaugeRegistry.register(prefix + MEMORY_USAGE, memoryUsage::get);
-        gaugeRegistry.register(prefix + STORE_USAGE, storeUsage::get);
-        gaugeRegistry.register(prefix + TEMP_USAGE, tempUsage::get);
-    }
-
-    @Override
-    public void update(MapMessage msg) throws JMSException {
-        super.update(msg);
-
-        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
-            memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
-        }
-        if (msg.itemExists(STORE_PERCENT_USAGE)) {
-            storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
-        }
-        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
-            tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
-        }
-
-        if (msg.itemExists(STORE_LIMIT)) {
-            storeLimit.set(msg.getLong(STORE_LIMIT));
-        }
-        if (msg.itemExists(TEMP_LIMIT)) {
-            tempLimit.set(msg.getLong(TEMP_LIMIT));
-        }
-
-        if (msg.itemExists(MEMORY_USAGE)) {
-            memoryUsage.set(msg.getLong(MEMORY_USAGE));
-        }
-        if (msg.itemExists(STORE_USAGE)) {
-            storeUsage.set(msg.getLong(STORE_USAGE));
-        }
-        if (msg.itemExists(TEMP_USAGE)) {
-            tempUsage.set(msg.getLong(TEMP_USAGE));
-        }
-    }
-
-}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
index 1585969abf..d4ed4856b0 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
@@ -62,7 +62,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
     private final MetricFactory metricFactory;
     private final GaugeRegistry gaugeRegistry;
 
-    private final Map<String, ActiveMQStatistics> registeredStatistics = new HashMap<>();
+    private final Map<String, ActiveMQMetrics> registeredStatistics = new HashMap<>();
 
     private Disposable disposable;
 
@@ -75,19 +75,18 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
 
     @Override
     public void collectBrokerStatistics() {
-        collectStatistics(new ActiveMQBrokerStatistics());
+        collectStatistics(ActiveMQMetrics.forBroker(gaugeRegistry));
     }
 
     @Override
     public void collectQueueStatistics(MailQueueName name) {
-        collectStatistics(ActiveMQQueueStatistics.from(name.asString()));
+        collectStatistics(ActiveMQMetrics.forQueue(name.asString(), gaugeRegistry));
     }
 
-    private void collectStatistics(ActiveMQStatistics statistics) {
+    private void collectStatistics(ActiveMQMetrics statistics) {
         if (!registeredStatistics.containsKey(statistics.getName())) {
             LOGGER.info("collecting statistics for {}", statistics.getName());
             registeredStatistics.put(statistics.getName(), statistics);
-            statistics.registerMetrics(gaugeRegistry);
         }
     }
 
@@ -120,7 +119,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
     }
 
     @VisibleForTesting
-    Void fetchAndUpdate(ActiveMQStatistics stats) throws JMSException {
+    Void fetchAndUpdate(ActiveMQMetrics stats) throws JMSException {
         Connection connection = null;
         Session session = null;
         TemporaryQueue replyTo = null;
@@ -146,7 +145,7 @@ public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
             } else if (!(reply instanceof MapMessage)) {
                 throw new JMSException("expected MapMessage but got " + reply.getClass());
             }
-            stats.update((MapMessage)reply);
+            stats.updateMetrics((MapMessage)reply);
         } finally {
             if (producer != null) {
                 producer.close();
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetrics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetrics.java
new file mode 100644
index 0000000000..6a6a1bfc37
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetrics.java
@@ -0,0 +1,176 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.apache.james.metrics.api.GaugeRegistry.SettableGauge;
+
+public class ActiveMQMetrics {
+    private static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
+    private static final String STORE_PERCENT_USAGE = "storePercentUsage";
+    private static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
+
+    private static final String MEMORY_LIMIT = "memoryLimit";
+    private static final String STORE_LIMIT = "storeLimit";
+    private static final String TEMP_LIMIT = "tempLimit";
+
+    private static final String MEMORY_USAGE = "memoryUsage";
+    private static final String STORE_USAGE = "storeUsage";
+    private static final String TEMP_USAGE = "tempUsage";
+
+    private static final String SIZE = "size";
+    private static final String ENQUEUE_COUNT = "enqueueCount";
+    private static final String DEQUEUE_COUNT = "dequeueCount";
+    private static final String INFLIGHT_COUNT = "inflightCount";
+    private static final String PRODUCER_COUNT = "producerCount";
+    private static final String CONSUMER_COUNT = "consumerCount";
+    private static final String EXPIRED_COUNT = "expiredCount";
+    private static final String DISPATCH_COUNT = "dispatchCount";
+    private static final String MESSAGES_CACHED = "messagesCached";
+
+    private static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
+    private static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
+    private static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
+
+    private static final String LAST_UPDATE = "lastUpdate";
+
+    private final String statsQueueName;
+
+    private final GaugeRegistry gaugeRegistry;
+
+    private final Map<String, SettableGauge<Integer>> registeredGaugesOfInteger = new HashMap<>();
+    private final Map<String, SettableGauge<Double>> registeredGaugesOfDouble = new HashMap<>();
+    private final Map<String, SettableGauge<Long>> registeredGaugesOfLong = new HashMap<>();
+
+    public static ActiveMQMetrics forQueue(String queueName, GaugeRegistry gaugeRegistry) {
+        return new ActiveMQMetrics("ActiveMQ.Statistics.Destination." + queueName, gaugeRegistry);
+    }
+
+    public static ActiveMQMetrics forBroker(GaugeRegistry gaugeRegistry) {
+        return new ActiveMQMetrics("ActiveMQ.Statistics.Broker", gaugeRegistry);
+    }
+
+    private ActiveMQMetrics(String statsQueueName, GaugeRegistry gaugeRegistry) {
+        this.statsQueueName = statsQueueName;
+        this.gaugeRegistry = gaugeRegistry;
+    }
+
+    public String getName() {
+        return statsQueueName;
+    }
+
+    public void updateMetrics(MapMessage msg) throws JMSException {
+
+        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
+            setGaugeAndRegisterIfAbsent(MEMORY_PERCENT_USAGE, msg.getInt(MEMORY_PERCENT_USAGE));
+        }
+        if (msg.itemExists(STORE_PERCENT_USAGE)) {
+            setGaugeAndRegisterIfAbsent(TEMP_PERCENT_USAGE, msg.getInt(STORE_PERCENT_USAGE));
+        }
+        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
+            setGaugeAndRegisterIfAbsent(TEMP_PERCENT_USAGE, msg.getInt(TEMP_PERCENT_USAGE));
+        }
+
+        if (msg.itemExists(MEMORY_LIMIT)) {
+            setGaugeAndRegisterIfAbsent(MEMORY_LIMIT, msg.getLong(MEMORY_LIMIT));
+        }
+        if (msg.itemExists(STORE_LIMIT)) {
+            setGaugeAndRegisterIfAbsent(STORE_LIMIT, msg.getLong(STORE_LIMIT));
+        }
+        if (msg.itemExists(TEMP_LIMIT)) {
+            setGaugeAndRegisterIfAbsent(TEMP_LIMIT, msg.getLong(TEMP_LIMIT));
+        }
+
+        if (msg.itemExists(MEMORY_USAGE)) {
+            setGaugeAndRegisterIfAbsent(MEMORY_USAGE, msg.getLong(MEMORY_USAGE));
+        }
+        if (msg.itemExists(STORE_USAGE)) {
+            setGaugeAndRegisterIfAbsent(STORE_USAGE, msg.getLong(STORE_USAGE));
+        }
+        if (msg.itemExists(TEMP_USAGE)) {
+            setGaugeAndRegisterIfAbsent(TEMP_USAGE, msg.getLong(TEMP_USAGE));
+        }
+
+        if (msg.itemExists(SIZE)) {
+            setGaugeAndRegisterIfAbsent(SIZE, msg.getLong(SIZE));
+        }
+        if (msg.itemExists(ENQUEUE_COUNT)) {
+            setGaugeAndRegisterIfAbsent(ENQUEUE_COUNT, msg.getLong(ENQUEUE_COUNT));
+        }
+        if (msg.itemExists(DEQUEUE_COUNT)) {
+            setGaugeAndRegisterIfAbsent(DEQUEUE_COUNT, msg.getLong(DEQUEUE_COUNT));
+        }
+        if (msg.itemExists(INFLIGHT_COUNT)) {
+            setGaugeAndRegisterIfAbsent(INFLIGHT_COUNT, msg.getLong(INFLIGHT_COUNT));
+        }
+        if (msg.itemExists(PRODUCER_COUNT)) {
+            setGaugeAndRegisterIfAbsent(PRODUCER_COUNT, msg.getLong(PRODUCER_COUNT));
+        }
+        if (msg.itemExists(CONSUMER_COUNT)) {
+            setGaugeAndRegisterIfAbsent(CONSUMER_COUNT, msg.getLong(CONSUMER_COUNT));
+        }
+        if (msg.itemExists(EXPIRED_COUNT)) {
+            setGaugeAndRegisterIfAbsent(EXPIRED_COUNT, msg.getLong(EXPIRED_COUNT));
+        }
+        if (msg.itemExists(DISPATCH_COUNT)) {
+            setGaugeAndRegisterIfAbsent(DISPATCH_COUNT, msg.getLong(DISPATCH_COUNT));
+        }
+        if (msg.itemExists(MESSAGES_CACHED)) {
+            setGaugeAndRegisterIfAbsent(MESSAGES_CACHED, msg.getLong(MESSAGES_CACHED));
+        }
+
+        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
+            setGaugeAndRegisterIfAbsent(MIN_ENQUEUE_TIME, msg.getDouble(MIN_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
+            setGaugeAndRegisterIfAbsent(MAX_ENQUEUE_TIME, msg.getDouble(MAX_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
+            setGaugeAndRegisterIfAbsent(AVERAGE_ENQUEUE_TIME, msg.getDouble(AVERAGE_ENQUEUE_TIME));
+        }
+
+        setGaugeAndRegisterIfAbsent(LAST_UPDATE, System.currentTimeMillis());
+    }
+
+    private void setGaugeAndRegisterIfAbsent(String name, long val) {
+        String key = statsQueueName + "." + name;
+        registeredGaugesOfLong.computeIfAbsent(key, any -> gaugeRegistry.settableGauge(key))
+            .setValue(val);
+    }
+
+    private void setGaugeAndRegisterIfAbsent(String name, int val) {
+        String key = statsQueueName + "." + name;
+        registeredGaugesOfInteger.computeIfAbsent(key, any -> gaugeRegistry.settableGauge(key))
+            .setValue(val);
+    }
+
+    private void setGaugeAndRegisterIfAbsent(String name, double val) {
+        String key = statsQueueName + "." + name;
+        registeredGaugesOfDouble.computeIfAbsent(key, any -> gaugeRegistry.settableGauge(key))
+            .setValue(val);
+    }
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java
deleted file mode 100644
index f92c99abb0..0000000000
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.activemq.metric;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-
-import org.apache.james.metrics.api.GaugeRegistry;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class ActiveMQQueueStatistics implements ActiveMQStatistics {
-    protected static final String MEMORY_LIMIT = "memoryLimit";
-
-    protected static final String SIZE = "size";
-    protected static final String ENQUEUE_COUNT = "enqueueCount";
-    protected static final String DEQUEUE_COUNT = "dequeueCount";
-    protected static final String INFLIGHT_COUNT = "inflightCount";
-    protected static final String PRODUCER_COUNT = "producerCount";
-    protected static final String CONSUMER_COUNT = "consumerCount";
-    protected static final String EXPIRED_COUNT = "expiredCount";
-    protected static final String DISPATCH_COUNT = "dispatchCount";
-    protected static final String MESSAGES_CACHED = "messagesCached";
-
-    protected static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
-    protected static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
-    protected static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
-
-    protected static final String LAST_UPDATE = "lastUpdate";
-
-    private final AtomicLong memoryLimit = new AtomicLong();
-
-    private final AtomicLong size = new AtomicLong();
-    private final AtomicLong enqueueCount = new AtomicLong();
-    private final AtomicLong dequeueCount = new AtomicLong();
-    private final AtomicLong inflightCount = new AtomicLong();
-    private final AtomicLong producerCount = new AtomicLong();
-    private final AtomicLong consumerCount = new AtomicLong();
-    private final AtomicLong expiredCount = new AtomicLong();
-    private final AtomicLong dispatchCount = new AtomicLong();
-    private final AtomicLong messagesCached = new AtomicLong();
-
-    private final AtomicDouble minEnqueueTime = new AtomicDouble();
-    private final AtomicDouble maxEnqueueTime = new AtomicDouble();
-    private final AtomicDouble averageEnqueueTime = new AtomicDouble();
-
-    private final AtomicLong lastUpdate = new AtomicLong();
-
-    protected final String statsQueueName;
-
-    public static ActiveMQQueueStatistics from(String queueName) {
-        return new ActiveMQQueueStatistics("ActiveMQ.Statistics.Destination." + queueName);
-    }
-
-    public ActiveMQQueueStatistics(String statsQueueName) {
-        this.statsQueueName = statsQueueName;
-    }
-
-    @Override
-    public String getName() {
-        return statsQueueName;
-    }
-
-    @Override
-    public void registerMetrics(GaugeRegistry gaugeRegistry) {
-        String prefix = statsQueueName + ".";
-
-        gaugeRegistry.register(prefix + MEMORY_LIMIT, memoryLimit::get);
-
-        gaugeRegistry.register(prefix + SIZE, size::get);
-        gaugeRegistry.register(prefix + ENQUEUE_COUNT, enqueueCount::get);
-        gaugeRegistry.register(prefix + DEQUEUE_COUNT, dequeueCount::get);
-        gaugeRegistry.register(prefix + INFLIGHT_COUNT, inflightCount::get);
-        gaugeRegistry.register(prefix + PRODUCER_COUNT, producerCount::get);
-        gaugeRegistry.register(prefix + CONSUMER_COUNT, consumerCount::get);
-        gaugeRegistry.register(prefix + EXPIRED_COUNT, expiredCount::get);
-        gaugeRegistry.register(prefix + DISPATCH_COUNT, dispatchCount::get);
-        gaugeRegistry.register(prefix + MESSAGES_CACHED, messagesCached::get);
-
-        gaugeRegistry.register(prefix + MIN_ENQUEUE_TIME, minEnqueueTime::get);
-        gaugeRegistry.register(prefix + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
-        gaugeRegistry.register(prefix + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
-
-        gaugeRegistry.register(prefix + LAST_UPDATE, lastUpdate::get);
-    }
-
-    @Override
-    public void update(MapMessage msg) throws JMSException {
-        if (msg.itemExists(MEMORY_LIMIT)) {
-            memoryLimit.set(msg.getLong(MEMORY_LIMIT));
-        }
-        if (msg.itemExists(SIZE)) {
-            size.set(msg.getLong(SIZE));
-        }
-        if (msg.itemExists(ENQUEUE_COUNT)) {
-            enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
-        }
-        if (msg.itemExists(DEQUEUE_COUNT)) {
-            dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
-        }
-        if (msg.itemExists(INFLIGHT_COUNT)) {
-            inflightCount.set(msg.getLong(INFLIGHT_COUNT));
-        }
-        if (msg.itemExists(PRODUCER_COUNT)) {
-            producerCount.set(msg.getLong(PRODUCER_COUNT));
-        }
-        if (msg.itemExists(CONSUMER_COUNT)) {
-            consumerCount.set(msg.getLong(CONSUMER_COUNT));
-        }
-        if (msg.itemExists(EXPIRED_COUNT)) {
-            expiredCount.set(msg.getLong(EXPIRED_COUNT));
-        }
-        if (msg.itemExists(DISPATCH_COUNT)) {
-            dispatchCount.set(msg.getLong(DISPATCH_COUNT));
-        }
-        if (msg.itemExists(MESSAGES_CACHED)) {
-            messagesCached.set(msg.getLong(MESSAGES_CACHED));
-        }
-
-        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
-            minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
-        }
-        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
-            maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
-        }
-        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
-            averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
-        }
-
-        lastUpdate.set(System.currentTimeMillis());
-    }
-
-    public long getLastUpdate() {
-        return lastUpdate.get();
-    }
-}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java
deleted file mode 100644
index 52d10504e9..0000000000
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.activemq.metric;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-
-import org.apache.james.metrics.api.GaugeRegistry;
-
-public interface ActiveMQStatistics {
-
-    String getName();
-
-    void registerMetrics(GaugeRegistry gaugeRegistry);
-
-    void update(MapMessage msg) throws JMSException;
-}
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
index f8a00e89cf..d1d02dea2e 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
@@ -49,11 +49,9 @@ import org.junit.jupiter.api.extension.ExtendWith;
 class ActiveMQMetricCollectorTest {
 
     private static ActiveMQConnectionFactory connectionFactory;
-    //private BrokerService broker;
 
     @BeforeAll
     static void setup(BrokerService broker) {
-        //this.broker = broker;
         connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
         ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
         prefetchPolicy.setQueuePrefetch(0);
@@ -62,38 +60,40 @@ class ActiveMQMetricCollectorTest {
 
     @Test
     void shouldFailToFetchAndUpdateStatisticsForUnknownQueue() {
-        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), new NoopGaugeRegistry());
-        ActiveMQQueueStatistics queueStatistics = new ActiveMQQueueStatistics("UNKNOWN");
+        SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQMetrics queueStatistics = ActiveMQMetrics.forQueue("UNKNOWN", gaugeRegistry);
 
         assertThatThrownBy(() -> testee.fetchAndUpdate(queueStatistics))
             .isInstanceOf(JMSException.class);
-        assertThat(queueStatistics.getLastUpdate())
-            .isEqualTo(0);
+
+        assertThat(gaugeRegistry.getGauge("ActiveMQ.Statistics.Destination.UNKNOWN")).isNull();
     }
 
     @Test
     void shouldFetchAndUpdateBrokerStatistics() throws Exception {
-        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), new NoopGaugeRegistry());
-        ActiveMQBrokerStatistics brokerStatistics = new ActiveMQBrokerStatistics();
+        SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
 
         long notBefore = System.currentTimeMillis();
         testee.fetchAndUpdate(brokerStatistics);
-        assertThat(brokerStatistics.getLastUpdate())
-            .isGreaterThanOrEqualTo(notBefore);
+        Number n = gaugeRegistry.getGauge("ActiveMQ.Statistics.Broker.lastUpdate");
+        assertThat(n).isInstanceOf(Long.class);
+        assertThat((Long) n).isGreaterThanOrEqualTo(notBefore);
     }
 
     @Test
     void shouldFetchAndUpdateBrokerStatisticsInGaugeRegistry() throws Exception {
         SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
         ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
-        ActiveMQBrokerStatistics brokerStatistics = new ActiveMQBrokerStatistics();
-        brokerStatistics.registerMetrics(gaugeRegistry);
+        ActiveMQMetrics brokerStatistics = ActiveMQMetrics.forBroker(gaugeRegistry);
 
         testee.fetchAndUpdate(brokerStatistics);
 
-        Supplier<?> supplier = gaugeRegistry.getGauge("ActiveMQ.Statistics.Broker.storeLimit");
-        assertThat(supplier.get()).isInstanceOf(Long.class);
-        assertThat((Long) supplier.get()).isGreaterThan(0);
+        Number n = gaugeRegistry.getGauge("ActiveMQ.Statistics.Broker.storeLimit");
+        assertThat(n).isInstanceOf(Long.class);
+        assertThat((Long) n).isGreaterThan(0);
     }
 
     @Test
@@ -123,8 +123,17 @@ class ActiveMQMetricCollectorTest {
             return this;
         }
 
-        public Gauge<?> getGauge(String name) {
-            return gauges.get(name);
+        @Override
+        public <T> SettableGauge<T> settableGauge(String name) {
+            return t -> gauges.put(name, () -> t);
+        }
+
+        public Number getGauge(String name) {
+            Gauge<?> g = gauges.get(name);
+            if (g == null) {
+                return null;
+            }
+            return (Number) g.get();
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/08: [JAMES-3841] ActiveMQ: restore simple connection health check and move collecting statistics into separate class (i.e. broker stats, per-queue stats) with periodic polling

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cffb52de58e549511b9bf18aa062ff912ac6e3b6
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Mon Nov 28 00:00:22 2022 +0100

    [JAMES-3841] ActiveMQ: restore simple connection health check and move collecting statistics into separate class (i.e. broker stats, per-queue stats) with periodic polling
---
 server/container/guice/queue/activemq/pom.xml      |   4 +
 .../queue/activemq/ActiveMQQueueModule.java        |  14 ++
 .../queue/activemq/ActiveMQBrokerStatistics.java   | 229 ---------------------
 .../james/queue/activemq/ActiveMQHealthCheck.java  |  92 +--------
 .../queue/activemq/ActiveMQMailQueueFactory.java   |  12 +-
 .../activemq/metric/ActiveMQBrokerStatistics.java  | 106 ++++++++++
 .../activemq/metric/ActiveMQMetricCollector.java   |  35 ++++
 .../metric/ActiveMQMetricCollectorImpl.java        | 178 ++++++++++++++++
 .../metric/ActiveMQMetricCollectorNoop.java        |  41 ++++
 .../activemq/metric/ActiveMQQueueStatistics.java   | 155 ++++++++++++++
 .../queue/activemq/metric/ActiveMQStatistics.java  |  34 +++
 .../queue/activemq/ActiveMQHealthCheckTest.java    |   5 +-
 .../activemq/ActiveMQMailQueueFactoryTest.java     |   8 +-
 .../metric/ActiveMQMetricCollectorTest.java        | 131 ++++++++++++
 14 files changed, 718 insertions(+), 326 deletions(-)

diff --git a/server/container/guice/queue/activemq/pom.xml b/server/container/guice/queue/activemq/pom.xml
index 3095e68d9d..aba2d14b55 100644
--- a/server/container/guice/queue/activemq/pom.xml
+++ b/server/container/guice/queue/activemq/pom.xml
@@ -32,6 +32,10 @@
     <description>Guice Module for ActiveMQ</description>
 
     <dependencies>
+        <dependency>
+            <groupId>${james.groupId}</groupId>
+            <artifactId>james-server-guice-configuration</artifactId>
+        </dependency>
         <dependency>
             <groupId>${james.groupId}</groupId>
             <artifactId>james-server-queue-activemq</artifactId>
diff --git a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
index d9609ae358..da56c98306 100644
--- a/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
+++ b/server/container/guice/queue/activemq/src/main/java/org/apache/james/modules/queue/activemq/ActiveMQQueueModule.java
@@ -27,15 +27,20 @@ import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.queue.activemq.ActiveMQHealthCheck;
 import org.apache.james.queue.activemq.ActiveMQMailQueueFactory;
 import org.apache.james.queue.activemq.EmbeddedActiveMQ;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricCollector;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricCollectorImpl;
 import org.apache.james.queue.api.MailQueue;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.ManageableMailQueue;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Scopes;
 import com.google.inject.Singleton;
 import com.google.inject.multibindings.Multibinder;
+import com.google.inject.multibindings.ProvidesIntoSet;
 
 public class ActiveMQQueueModule extends AbstractModule {
 
@@ -45,6 +50,8 @@ public class ActiveMQQueueModule extends AbstractModule {
         bind(KahaDBPersistenceAdapter.class).in(Scopes.SINGLETON);
         bind(EmbeddedActiveMQ.class).in(Scopes.SINGLETON);
         bind(ActiveMQMailQueueFactory.class).in(Scopes.SINGLETON);
+        bind(ActiveMQMetricCollector.class).to(ActiveMQMetricCollectorImpl.class);
+        bind(ActiveMQMetricCollectorImpl.class).in(Scopes.SINGLETON);
 
         Multibinder.newSetBinder(binder(), HealthCheck.class).addBinding().to(ActiveMQHealthCheck.class);
     }
@@ -74,4 +81,11 @@ public class ActiveMQQueueModule extends AbstractModule {
     public MailQueueFactory<? extends MailQueue> provideMailQueueFactoryGenerics(ActiveMQMailQueueFactory queueFactory) {
         return queueFactory;
     }
+
+    @ProvidesIntoSet
+    InitializationOperation configureMetricCollector(ActiveMQMetricCollector metricCollector) {
+        return InitilizationOperationBuilder
+            .forClass(ActiveMQMetricCollector.class)
+            .init(metricCollector::start);
+    }
 }
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
deleted file mode 100644
index e64ff380d9..0000000000
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-
-package org.apache.james.queue.activemq;
-
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-
-import org.apache.james.metrics.api.GaugeRegistry;
-
-import com.google.common.util.concurrent.AtomicDouble;
-
-public class ActiveMQBrokerStatistics {
-
-    public static final String STATS_QUEUE_NAME = "ActiveMQ.Statistics.Broker";
-
-    private static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
-    private static final String STORE_PERCENT_USAGE = "storePercentUsage";
-    private static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
-
-    private static final String STORE_LIMIT = "storeLimit";
-    private static final String MEMORY_LIMIT = "memoryLimit";
-    private static final String TEMP_LIMIT = "tempLimit";
-
-    private static final String MEMORY_USAGE = "memoryUsage";
-    private static final String STORE_USAGE = "storeUsage";
-    private static final String TEMP_USAGE = "tempUsage";
-
-    private static final String SIZE = "size";
-    private static final String ENQUEUE_COUNT = "enqueueCount";
-    private static final String DEQUEUE_COUNT = "dequeueCount";
-    private static final String INFLIGHT_COUNT = "inflightCount";
-    private static final String PRODUCER_COUNT = "producerCount";
-    private static final String CONSUMER_COUNT = "consumerCount";
-    private static final String EXPIRED_COUNT = "expiredCount";
-    private static final String DISPATCH_COUNT = "dispatchCount";
-    private static final String MESSAGES_CACHED = "messagesCached";
-
-    private static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
-    private static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
-    private static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
-
-    private final AtomicLong lastUpdate = new AtomicLong();
-
-    private final AtomicInteger memoryPercentUsage = new AtomicInteger();
-    private final AtomicInteger storePercentUsage = new AtomicInteger();
-    private final AtomicInteger tempPercentUsage = new AtomicInteger();
-
-    private final AtomicLong memoryLimit = new AtomicLong();
-    private final AtomicLong storeLimit = new AtomicLong();
-    private final AtomicLong tempLimit = new AtomicLong();
-
-    private final AtomicLong memoryUsage = new AtomicLong();
-    private final AtomicLong storeUsage = new AtomicLong();
-    private final AtomicLong tempUsage = new AtomicLong();
-
-    private final AtomicLong size = new AtomicLong();
-    private final AtomicLong enqueueCount = new AtomicLong();
-    private final AtomicLong dequeueCount = new AtomicLong();
-    private final AtomicLong inflightCount = new AtomicLong();
-    private final AtomicLong producerCount = new AtomicLong();
-    private final AtomicLong consumerCount = new AtomicLong();
-    private final AtomicLong expiredCount = new AtomicLong();
-    private final AtomicLong dispatchCount = new AtomicLong();
-    private final AtomicLong messagesCached = new AtomicLong();
-
-    private final AtomicDouble minEnqueueTime = new AtomicDouble();
-    private final AtomicDouble maxEnqueueTime = new AtomicDouble();
-    private final AtomicDouble averageEnqueueTime = new AtomicDouble();
-
-    public ActiveMQBrokerStatistics(GaugeRegistry gaugeRegistry) {
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_PERCENT_USAGE, storePercentUsage::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_PERCENT_USAGE, tempPercentUsage::get);
-
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_LIMIT, memoryLimit::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_LIMIT, storeLimit::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_LIMIT, tempLimit::get);
-
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_USAGE, memoryUsage::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_USAGE, storeUsage::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_USAGE, tempUsage::get);
-
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + SIZE, size::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + ENQUEUE_COUNT, enqueueCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DEQUEUE_COUNT, dequeueCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + INFLIGHT_COUNT, inflightCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + PRODUCER_COUNT, producerCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + CONSUMER_COUNT, consumerCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + EXPIRED_COUNT, expiredCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + DISPATCH_COUNT, dispatchCount::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MESSAGES_CACHED, messagesCached::get);
-
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MIN_ENQUEUE_TIME, minEnqueueTime::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
-        gaugeRegistry.register(STATS_QUEUE_NAME + "." + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
-
-        gaugeRegistry.register(STATS_QUEUE_NAME + ".lastUpdate", lastUpdate::get);
-    }
-
-    public void update(MapMessage msg) throws JMSException {
-        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
-            memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
-        }
-        if (msg.itemExists(STORE_PERCENT_USAGE)) {
-            storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
-        }
-        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
-            tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
-        }
-
-        if (msg.itemExists(MEMORY_LIMIT)) {
-            memoryLimit.set(msg.getLong(MEMORY_LIMIT));
-        }
-        if (msg.itemExists(STORE_LIMIT)) {
-            storeLimit.set(msg.getLong(STORE_LIMIT));
-        }
-        if (msg.itemExists(TEMP_LIMIT)) {
-            tempLimit.set(msg.getLong(TEMP_LIMIT));
-        }
-
-        if (msg.itemExists(MEMORY_USAGE)) {
-            memoryUsage.set(msg.getLong(MEMORY_USAGE));
-        }
-        if (msg.itemExists(STORE_USAGE)) {
-            storeUsage.set(msg.getLong(STORE_USAGE));
-        }
-        if (msg.itemExists(TEMP_USAGE)) {
-            tempUsage.set(msg.getLong(TEMP_USAGE));
-        }
-
-        if (msg.itemExists(SIZE)) {
-            size.set(msg.getLong(SIZE));
-        }
-        if (msg.itemExists(ENQUEUE_COUNT)) {
-            enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
-        }
-        if (msg.itemExists(DEQUEUE_COUNT)) {
-            dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
-        }
-        if (msg.itemExists(INFLIGHT_COUNT)) {
-            inflightCount.set(msg.getLong(INFLIGHT_COUNT));
-        }
-        if (msg.itemExists(PRODUCER_COUNT)) {
-            producerCount.set(msg.getLong(PRODUCER_COUNT));
-        }
-        if (msg.itemExists(CONSUMER_COUNT)) {
-            consumerCount.set(msg.getLong(CONSUMER_COUNT));
-        }
-        if (msg.itemExists(EXPIRED_COUNT)) {
-            expiredCount.set(msg.getLong(EXPIRED_COUNT));
-        }
-        if (msg.itemExists(DISPATCH_COUNT)) {
-            dispatchCount.set(msg.getLong(DISPATCH_COUNT));
-        }
-        if (msg.itemExists(MESSAGES_CACHED)) {
-            messagesCached.set(msg.getLong(MESSAGES_CACHED));
-        }
-
-        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
-            minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
-        }
-        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
-            maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
-        }
-        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
-            averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
-        }
-
-        lastUpdate.set(System.currentTimeMillis());
-    }
-
-    public long getLastUpdate() {
-        return lastUpdate.get();
-    }
-
-    /*
-    vm=vm://localhost
-    - memoryUsage=0
-    - storeUsage=3330
-    - tempPercentUsage=0
-    ssl=
-    openwire=tcp://localhost:50059
-    brokerId=ID:bigmac-50057-1253605065511-0:0
-    - consumerCount=2
-    brokerName=localhost
-    - expiredCount=0
-    - dispatchCount=1
-    - maxEnqueueTime=5.0
-    - storePercentUsage=0
-    - dequeueCount=0
-    - inflightCount=1
-    - messagesCached=0
-    - tempLimit=107374182400
-    - averageEnqueueTime=5.0
-    stomp+ssl=
-    - memoryPercentUsage=0
-    - size=10
-    - tempUsage=0
-    - producerCount=1
-    - minEnqueueTime=5.0
-    dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data
-    - enqueueCount=10
-    stomp=
-    - storeLimit=107374182400
-    - memoryLimit=67108864
-     */
-
-
-}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
index 07eb137794..a60b92d11b 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
@@ -19,24 +19,14 @@
 
 package org.apache.james.queue.activemq;
 
-import java.time.Duration;
-
 import javax.inject.Inject;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TemporaryQueue;
 
 import org.apache.james.core.healthcheck.ComponentName;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.core.healthcheck.Result;
-import org.apache.james.metrics.api.GaugeRegistry;
 import org.reactivestreams.Publisher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,14 +37,11 @@ import reactor.core.scheduler.Schedulers;
 public class ActiveMQHealthCheck implements HealthCheck {
     public static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQHealthCheck.class);
     public static final ComponentName COMPONENT_NAME = new ComponentName("Embedded ActiveMQ");
-    private static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
     private final ConnectionFactory connectionFactory;
-    private final ActiveMQBrokerStatistics brokerStatistics;
 
     @Inject
-    public ActiveMQHealthCheck(ConnectionFactory connectionFactory, GaugeRegistry gaugeRegistry) {
+    public ActiveMQHealthCheck(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
-        this.brokerStatistics = new ActiveMQBrokerStatistics(gaugeRegistry);
     }
 
     @Override
@@ -66,7 +53,9 @@ public class ActiveMQHealthCheck implements HealthCheck {
     public Publisher<Result> check() {
         return Mono.fromCallable(() -> {
             try {
-                retrieveAndUpdateBrokerStatistics();
+                Connection connection = connectionFactory.createConnection();
+                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+                session.close();
                 return Result.healthy(COMPONENT_NAME);
             } catch (Exception e) {
                 LOGGER.warn("{} is unhealthy. {}", COMPONENT_NAME.getName(), e.getMessage());
@@ -75,78 +64,5 @@ public class ActiveMQHealthCheck implements HealthCheck {
         }).subscribeOn(Schedulers.boundedElastic());
     }
 
-    private void retrieveAndUpdateBrokerStatistics() throws JMSException {
-        Connection connection = null;
-        Session session = null;
-        TemporaryQueue replyTo = null;
-        MessageConsumer consumer = null;
-        MessageProducer producer = null;
-        try {
-            connection = connectionFactory.createConnection();
-            connection.start();
-            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
-            replyTo = session.createTemporaryQueue();
-            consumer = session.createConsumer(replyTo);
-
-            Queue testQueue = session.createQueue(ActiveMQBrokerStatistics.STATS_QUEUE_NAME);
-            producer = session.createProducer(testQueue);
-            Message msg = session.createMessage();
-            msg.setJMSReplyTo(replyTo);
-            producer.send(msg);
-
-            Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
-            if (reply == null) {
-                throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
-            } else if (!(reply instanceof MapMessage)) {
-                throw new JMSException("expected MapMessage but got " + reply.getClass());
-            }
-            brokerStatistics.update((MapMessage)reply);
-        } finally {
-            if (producer != null) {
-                try {
-                    producer.close();
-                } catch (JMSException e) {
-                    LOGGER.warn("error while closing producer", e);
-                }
-            }
-
-            if (consumer != null) {
-                try {
-                    consumer.close();
-                } catch (JMSException e) {
-                    LOGGER.warn("error while closing consumer", e);
-                }
-            }
-
-            if (replyTo != null) {
-                try {
-                    // we need to delete the temporary queue to be sure we will
-                    // free up memory if thats not done and a pool is used
-                    // its possible that we will register a new mbean in jmx for
-                    // every TemporaryQueue which will never get unregistered
-                    replyTo.delete();
-                } catch (JMSException e) {
-                    LOGGER.warn("error while deleting temporary queue", e);
-                }
-            }
-
-            if (session != null) {
-                try {
-                    session.close();
-                } catch (JMSException e) {
-                    LOGGER.warn("error while closing session", e);
-                }
-            }
-
-            if (connection != null) {
-                try {
-                    connection.close();
-                } catch (JMSException e) {
-                    LOGGER.warn("error while closing connection", e);
-                }
-            }
-        }
-    }
 }
 
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
index dcfe273fc7..b026f54969 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactory.java
@@ -23,6 +23,7 @@ import javax.jms.ConnectionFactory;
 
 import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricCollector;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueItemDecoratorFactory;
 import org.apache.james.queue.api.MailQueueName;
@@ -37,14 +38,18 @@ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
 
     private boolean useBlob = true;
 
+    private final ActiveMQMetricCollector activeMQMetricCollector;
+
     public ActiveMQMailQueueFactory(ConnectionFactory connectionFactory, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, MetricFactory metricFactory,
-                                    GaugeRegistry gaugeRegistry) {
+                                    GaugeRegistry gaugeRegistry, ActiveMQMetricCollector activeMQMetricCollector) {
         super(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry);
+        this.activeMQMetricCollector = activeMQMetricCollector;
     }
 
     @Inject
-    public ActiveMQMailQueueFactory(EmbeddedActiveMQ embeddedActiveMQ, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
-        this(embeddedActiveMQ.getConnectionFactory(), mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry);
+    public ActiveMQMailQueueFactory(EmbeddedActiveMQ embeddedActiveMQ, MailQueueItemDecoratorFactory mailQueueItemDecoratorFactory, MetricFactory metricFactory,
+                                    GaugeRegistry gaugeRegistry, ActiveMQMetricCollector activeMQMetricCollector) {
+        this(embeddedActiveMQ.getConnectionFactory(), mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry, activeMQMetricCollector);
     }
 
     public void setUseBlobMessages(boolean useBlob) {
@@ -53,6 +58,7 @@ public class ActiveMQMailQueueFactory extends JMSMailQueueFactory {
 
     @Override
     protected ManageableMailQueue createCacheableMailQueue(MailQueueName name) {
+        activeMQMetricCollector.collectQueueStatistics(name);
         return new ActiveMQCacheableMailQueue(connectionFactory, mailQueueItemDecoratorFactory, name, useBlob, metricFactory, gaugeRegistry);
     }
 }
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java
new file mode 100644
index 0000000000..e7ae8acbd3
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQBrokerStatistics.java
@@ -0,0 +1,106 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+public class ActiveMQBrokerStatistics extends ActiveMQQueueStatistics {
+    public static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
+    public static final String STORE_PERCENT_USAGE = "storePercentUsage";
+    public static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
+
+    public static final String STORE_LIMIT = "storeLimit";
+    public static final String TEMP_LIMIT = "tempLimit";
+
+    public static final String MEMORY_USAGE = "memoryUsage";
+    public static final String STORE_USAGE = "storeUsage";
+    public static final String TEMP_USAGE = "tempUsage";
+
+    private final AtomicInteger memoryPercentUsage = new AtomicInteger();
+    private final AtomicInteger storePercentUsage = new AtomicInteger();
+    private final AtomicInteger tempPercentUsage = new AtomicInteger();
+
+    private final AtomicLong storeLimit = new AtomicLong();
+    private final AtomicLong tempLimit = new AtomicLong();
+
+    private final AtomicLong memoryUsage = new AtomicLong();
+    private final AtomicLong storeUsage = new AtomicLong();
+    private final AtomicLong tempUsage = new AtomicLong();
+
+    public ActiveMQBrokerStatistics() {
+        super("ActiveMQ.Statistics.Broker");
+    }
+
+    @Override
+    public void registerMetrics(GaugeRegistry gaugeRegistry) {
+        super.registerMetrics(gaugeRegistry);
+
+        String prefix = getName() + ".";
+        gaugeRegistry.register(prefix + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
+        gaugeRegistry.register(prefix + STORE_PERCENT_USAGE, storePercentUsage::get);
+        gaugeRegistry.register(prefix + TEMP_PERCENT_USAGE, tempPercentUsage::get);
+
+        gaugeRegistry.register(prefix + STORE_LIMIT, storeLimit::get);
+        gaugeRegistry.register(prefix + TEMP_LIMIT, tempLimit::get);
+
+        gaugeRegistry.register(prefix + MEMORY_USAGE, memoryUsage::get);
+        gaugeRegistry.register(prefix + STORE_USAGE, storeUsage::get);
+        gaugeRegistry.register(prefix + TEMP_USAGE, tempUsage::get);
+    }
+
+    @Override
+    public void update(MapMessage msg) throws JMSException {
+        super.update(msg);
+
+        if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
+            memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
+        }
+        if (msg.itemExists(STORE_PERCENT_USAGE)) {
+            storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
+        }
+        if (msg.itemExists(TEMP_PERCENT_USAGE)) {
+            tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
+        }
+
+        if (msg.itemExists(STORE_LIMIT)) {
+            storeLimit.set(msg.getLong(STORE_LIMIT));
+        }
+        if (msg.itemExists(TEMP_LIMIT)) {
+            tempLimit.set(msg.getLong(TEMP_LIMIT));
+        }
+
+        if (msg.itemExists(MEMORY_USAGE)) {
+            memoryUsage.set(msg.getLong(MEMORY_USAGE));
+        }
+        if (msg.itemExists(STORE_USAGE)) {
+            storeUsage.set(msg.getLong(STORE_USAGE));
+        }
+        if (msg.itemExists(TEMP_USAGE)) {
+            tempUsage.set(msg.getLong(TEMP_USAGE));
+        }
+    }
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollector.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollector.java
new file mode 100644
index 0000000000..e83adab205
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollector.java
@@ -0,0 +1,35 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import org.apache.james.lifecycle.api.Startable;
+import org.apache.james.queue.api.MailQueueName;
+
+public interface ActiveMQMetricCollector extends Startable {
+
+    void start();
+
+    void stop();
+
+    void collectBrokerStatistics();
+
+    void collectQueueStatistics(MailQueueName name);
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
new file mode 100644
index 0000000000..1585969abf
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorImpl.java
@@ -0,0 +1,178 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PreDestroy;
+import javax.inject.Inject;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+public class ActiveMQMetricCollectorImpl implements ActiveMQMetricCollector {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQMetricCollectorImpl.class);
+
+    public static final Duration REFRESH_DELAY = Duration.ofSeconds(2);
+    public static final Duration REFRESH_INTERVAL = Duration.ofSeconds(5);
+    public static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
+    public static final Duration REFRESH_TIMEOUT = RECEIVE_TIMEOUT.multipliedBy(2);
+
+    private final ConnectionFactory connectionFactory;
+    private final MetricFactory metricFactory;
+    private final GaugeRegistry gaugeRegistry;
+
+    private final Map<String, ActiveMQStatistics> registeredStatistics = new HashMap<>();
+
+    private Disposable disposable;
+
+    @Inject
+    public ActiveMQMetricCollectorImpl(ConnectionFactory connectionFactory, MetricFactory metricFactory, GaugeRegistry gaugeRegistry) {
+        this.connectionFactory = connectionFactory;
+        this.metricFactory = metricFactory;
+        this.gaugeRegistry = gaugeRegistry;
+    }
+
+    @Override
+    public void collectBrokerStatistics() {
+        collectStatistics(new ActiveMQBrokerStatistics());
+    }
+
+    @Override
+    public void collectQueueStatistics(MailQueueName name) {
+        collectStatistics(ActiveMQQueueStatistics.from(name.asString()));
+    }
+
+    private void collectStatistics(ActiveMQStatistics statistics) {
+        if (!registeredStatistics.containsKey(statistics.getName())) {
+            LOGGER.info("collecting statistics for {}", statistics.getName());
+            registeredStatistics.put(statistics.getName(), statistics);
+            statistics.registerMetrics(gaugeRegistry);
+        }
+    }
+
+    @Override
+    public void start() {
+        collectBrokerStatistics();
+
+        LOGGER.info("start delay={} interval={}", REFRESH_DELAY, REFRESH_INTERVAL);
+        disposable = Flux.interval(REFRESH_DELAY, REFRESH_INTERVAL)
+            .flatMap(any -> Flux.fromStream(() -> registeredStatistics.values().stream())
+                .flatMap((s) -> {
+                    Mono<Void> task = Mono.fromCallable(() -> fetchAndUpdate(s)).timeout(REFRESH_TIMEOUT);
+                    return metricFactory.decoratePublisherWithTimerMetric(s.getName() + "._time", task);
+                })
+            )
+            .onErrorContinue(this::logError)
+            .subscribeOn(Schedulers.newSingle(ActiveMQMetricCollectorImpl.class.getSimpleName()))
+            .subscribe();
+    }
+
+    @Override
+    @PreDestroy
+    public void stop() {
+        disposable.dispose();
+    }
+
+    private Void logError(Throwable error, Object triggeringValue) {
+        LOGGER.warn("failed to fetch and update broker statistics", error);
+        return null;
+    }
+
+    @VisibleForTesting
+    Void fetchAndUpdate(ActiveMQStatistics stats) throws JMSException {
+        Connection connection = null;
+        Session session = null;
+        TemporaryQueue replyTo = null;
+        MessageConsumer consumer = null;
+        MessageProducer producer = null;
+        try {
+            connection = connectionFactory.createConnection();
+            connection.start();
+            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            replyTo = session.createTemporaryQueue();
+            consumer = session.createConsumer(replyTo);
+
+            Queue testQueue = session.createQueue(stats.getName());
+            producer = session.createProducer(testQueue);
+            Message msg = session.createMessage();
+            msg.setJMSReplyTo(replyTo);
+            producer.send(msg);
+
+            Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+            if (reply == null) {
+                throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+            } else if (!(reply instanceof MapMessage)) {
+                throw new JMSException("expected MapMessage but got " + reply.getClass());
+            }
+            stats.update((MapMessage)reply);
+        } finally {
+            if (producer != null) {
+                producer.close();
+            }
+
+            if (consumer != null) {
+                consumer.close();
+            }
+
+            if (replyTo != null) {
+                // we need to delete the temporary queue to be sure we will
+                // free up memory if thats not done and a pool is used
+                // its possible that we will register a new mbean in jmx for
+                // every TemporaryQueue which will never get unregistered
+                replyTo.delete();
+            }
+
+            if (session != null) {
+                session.close();
+            }
+
+            if (connection != null) {
+                connection.close();
+            }
+        }
+        return null;
+    }
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorNoop.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorNoop.java
new file mode 100644
index 0000000000..b5343cf6b3
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorNoop.java
@@ -0,0 +1,41 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import org.apache.james.queue.api.MailQueueName;
+
+public class ActiveMQMetricCollectorNoop implements ActiveMQMetricCollector {
+
+    @Override
+    public void start() {
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public void collectBrokerStatistics() {
+    }
+
+    @Override
+    public void collectQueueStatistics(MailQueueName name) {
+    }
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java
new file mode 100644
index 0000000000..f92c99abb0
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQQueueStatistics.java
@@ -0,0 +1,155 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class ActiveMQQueueStatistics implements ActiveMQStatistics {
+    protected static final String MEMORY_LIMIT = "memoryLimit";
+
+    protected static final String SIZE = "size";
+    protected static final String ENQUEUE_COUNT = "enqueueCount";
+    protected static final String DEQUEUE_COUNT = "dequeueCount";
+    protected static final String INFLIGHT_COUNT = "inflightCount";
+    protected static final String PRODUCER_COUNT = "producerCount";
+    protected static final String CONSUMER_COUNT = "consumerCount";
+    protected static final String EXPIRED_COUNT = "expiredCount";
+    protected static final String DISPATCH_COUNT = "dispatchCount";
+    protected static final String MESSAGES_CACHED = "messagesCached";
+
+    protected static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
+    protected static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
+    protected static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
+
+    protected static final String LAST_UPDATE = "lastUpdate";
+
+    private final AtomicLong memoryLimit = new AtomicLong();
+
+    private final AtomicLong size = new AtomicLong();
+    private final AtomicLong enqueueCount = new AtomicLong();
+    private final AtomicLong dequeueCount = new AtomicLong();
+    private final AtomicLong inflightCount = new AtomicLong();
+    private final AtomicLong producerCount = new AtomicLong();
+    private final AtomicLong consumerCount = new AtomicLong();
+    private final AtomicLong expiredCount = new AtomicLong();
+    private final AtomicLong dispatchCount = new AtomicLong();
+    private final AtomicLong messagesCached = new AtomicLong();
+
+    private final AtomicDouble minEnqueueTime = new AtomicDouble();
+    private final AtomicDouble maxEnqueueTime = new AtomicDouble();
+    private final AtomicDouble averageEnqueueTime = new AtomicDouble();
+
+    private final AtomicLong lastUpdate = new AtomicLong();
+
+    protected final String statsQueueName;
+
+    public static ActiveMQQueueStatistics from(String queueName) {
+        return new ActiveMQQueueStatistics("ActiveMQ.Statistics.Destination." + queueName);
+    }
+
+    public ActiveMQQueueStatistics(String statsQueueName) {
+        this.statsQueueName = statsQueueName;
+    }
+
+    @Override
+    public String getName() {
+        return statsQueueName;
+    }
+
+    @Override
+    public void registerMetrics(GaugeRegistry gaugeRegistry) {
+        String prefix = statsQueueName + ".";
+
+        gaugeRegistry.register(prefix + MEMORY_LIMIT, memoryLimit::get);
+
+        gaugeRegistry.register(prefix + SIZE, size::get);
+        gaugeRegistry.register(prefix + ENQUEUE_COUNT, enqueueCount::get);
+        gaugeRegistry.register(prefix + DEQUEUE_COUNT, dequeueCount::get);
+        gaugeRegistry.register(prefix + INFLIGHT_COUNT, inflightCount::get);
+        gaugeRegistry.register(prefix + PRODUCER_COUNT, producerCount::get);
+        gaugeRegistry.register(prefix + CONSUMER_COUNT, consumerCount::get);
+        gaugeRegistry.register(prefix + EXPIRED_COUNT, expiredCount::get);
+        gaugeRegistry.register(prefix + DISPATCH_COUNT, dispatchCount::get);
+        gaugeRegistry.register(prefix + MESSAGES_CACHED, messagesCached::get);
+
+        gaugeRegistry.register(prefix + MIN_ENQUEUE_TIME, minEnqueueTime::get);
+        gaugeRegistry.register(prefix + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
+        gaugeRegistry.register(prefix + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
+
+        gaugeRegistry.register(prefix + LAST_UPDATE, lastUpdate::get);
+    }
+
+    @Override
+    public void update(MapMessage msg) throws JMSException {
+        if (msg.itemExists(MEMORY_LIMIT)) {
+            memoryLimit.set(msg.getLong(MEMORY_LIMIT));
+        }
+        if (msg.itemExists(SIZE)) {
+            size.set(msg.getLong(SIZE));
+        }
+        if (msg.itemExists(ENQUEUE_COUNT)) {
+            enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
+        }
+        if (msg.itemExists(DEQUEUE_COUNT)) {
+            dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
+        }
+        if (msg.itemExists(INFLIGHT_COUNT)) {
+            inflightCount.set(msg.getLong(INFLIGHT_COUNT));
+        }
+        if (msg.itemExists(PRODUCER_COUNT)) {
+            producerCount.set(msg.getLong(PRODUCER_COUNT));
+        }
+        if (msg.itemExists(CONSUMER_COUNT)) {
+            consumerCount.set(msg.getLong(CONSUMER_COUNT));
+        }
+        if (msg.itemExists(EXPIRED_COUNT)) {
+            expiredCount.set(msg.getLong(EXPIRED_COUNT));
+        }
+        if (msg.itemExists(DISPATCH_COUNT)) {
+            dispatchCount.set(msg.getLong(DISPATCH_COUNT));
+        }
+        if (msg.itemExists(MESSAGES_CACHED)) {
+            messagesCached.set(msg.getLong(MESSAGES_CACHED));
+        }
+
+        if (msg.itemExists(MIN_ENQUEUE_TIME)) {
+            minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(MAX_ENQUEUE_TIME)) {
+            maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
+        }
+        if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
+            averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
+        }
+
+        lastUpdate.set(System.currentTimeMillis());
+    }
+
+    public long getLastUpdate() {
+        return lastUpdate.get();
+    }
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java
new file mode 100644
index 0000000000..52d10504e9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/metric/ActiveMQStatistics.java
@@ -0,0 +1,34 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+public interface ActiveMQStatistics {
+
+    String getName();
+
+    void registerMetrics(GaugeRegistry gaugeRegistry);
+
+    void update(MapMessage msg) throws JMSException;
+}
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
index aef9d9a499..ee3ecfd708 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
@@ -26,16 +26,13 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.core.healthcheck.Result;
-import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.queue.jms.BrokerExtension;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 
 import reactor.core.publisher.Mono;
 
-@Tag(BrokerExtension.STATISTICS)
 @ExtendWith(BrokerExtension.class)
 class ActiveMQHealthCheckTest {
 
@@ -50,7 +47,7 @@ class ActiveMQHealthCheckTest {
         prefetchPolicy.setQueuePrefetch(0);
         connectionFactory.setPrefetchPolicy(prefetchPolicy);
 
-        testee = new ActiveMQHealthCheck(connectionFactory, new NoopGaugeRegistry());
+        testee = new ActiveMQHealthCheck(connectionFactory);
     }
 
     @Test
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactoryTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactoryTest.java
index 5b1e92784b..447d17a740 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactoryTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQMailQueueFactoryTest.java
@@ -25,6 +25,8 @@ import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.james.metrics.api.NoopGaugeRegistry;
 import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricCollector;
+import org.apache.james.queue.activemq.metric.ActiveMQMetricCollectorNoop;
 import org.apache.james.queue.api.MailQueueFactory;
 import org.apache.james.queue.api.MailQueueFactoryContract;
 import org.apache.james.queue.api.ManageableMailQueue;
@@ -49,7 +51,8 @@ public class ActiveMQMailQueueFactoryTest {
             RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
             RecordingMetricFactory metricFactory = new RecordingMetricFactory();
             NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
-            mailQueueFactory = new ActiveMQMailQueueFactory(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry);
+            ActiveMQMetricCollector metricCollector = new ActiveMQMetricCollectorNoop();
+            mailQueueFactory = new ActiveMQMailQueueFactory(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry, metricCollector);
             mailQueueFactory.setUseJMX(false);
             mailQueueFactory.setUseBlobMessages(false);
         }
@@ -88,7 +91,8 @@ public class ActiveMQMailQueueFactoryTest {
             RawMailQueueItemDecoratorFactory mailQueueItemDecoratorFactory = new RawMailQueueItemDecoratorFactory();
             RecordingMetricFactory metricFactory = new RecordingMetricFactory();
             NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
-            mailQueueFactory = new ActiveMQMailQueueFactory(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry);
+            ActiveMQMetricCollector metricCollector = new ActiveMQMetricCollectorNoop();
+            mailQueueFactory = new ActiveMQMailQueueFactory(connectionFactory, mailQueueItemDecoratorFactory, metricFactory, gaugeRegistry, metricCollector);
             mailQueueFactory.setUseJMX(false);
             mailQueueFactory.setUseBlobMessages(true);
         }
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
new file mode 100644
index 0000000000..f8a00e89cf
--- /dev/null
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/metric/ActiveMQMetricCollectorTest.java
@@ -0,0 +1,131 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq.metric;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.james.metrics.api.Gauge;
+import org.apache.james.metrics.api.GaugeRegistry;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
+import org.apache.james.metrics.tests.RecordingMetricFactory;
+import org.apache.james.queue.api.MailQueueName;
+import org.apache.james.queue.jms.BrokerExtension;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(BrokerExtension.class)
+@Tag(BrokerExtension.STATISTICS)
+class ActiveMQMetricCollectorTest {
+
+    private static ActiveMQConnectionFactory connectionFactory;
+    //private BrokerService broker;
+
+    @BeforeAll
+    static void setup(BrokerService broker) {
+        //this.broker = broker;
+        connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
+        ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
+        prefetchPolicy.setQueuePrefetch(0);
+        connectionFactory.setPrefetchPolicy(prefetchPolicy);
+    }
+
+    @Test
+    void shouldFailToFetchAndUpdateStatisticsForUnknownQueue() {
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), new NoopGaugeRegistry());
+        ActiveMQQueueStatistics queueStatistics = new ActiveMQQueueStatistics("UNKNOWN");
+
+        assertThatThrownBy(() -> testee.fetchAndUpdate(queueStatistics))
+            .isInstanceOf(JMSException.class);
+        assertThat(queueStatistics.getLastUpdate())
+            .isEqualTo(0);
+    }
+
+    @Test
+    void shouldFetchAndUpdateBrokerStatistics() throws Exception {
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), new NoopGaugeRegistry());
+        ActiveMQBrokerStatistics brokerStatistics = new ActiveMQBrokerStatistics();
+
+        long notBefore = System.currentTimeMillis();
+        testee.fetchAndUpdate(brokerStatistics);
+        assertThat(brokerStatistics.getLastUpdate())
+            .isGreaterThanOrEqualTo(notBefore);
+    }
+
+    @Test
+    void shouldFetchAndUpdateBrokerStatisticsInGaugeRegistry() throws Exception {
+        SimpleGaugeRegistry gaugeRegistry = new SimpleGaugeRegistry();
+        ActiveMQMetricCollectorImpl testee = new ActiveMQMetricCollectorImpl(connectionFactory, new RecordingMetricFactory(), gaugeRegistry);
+        ActiveMQBrokerStatistics brokerStatistics = new ActiveMQBrokerStatistics();
+        brokerStatistics.registerMetrics(gaugeRegistry);
+
+        testee.fetchAndUpdate(brokerStatistics);
+
+        Supplier<?> supplier = gaugeRegistry.getGauge("ActiveMQ.Statistics.Broker.storeLimit");
+        assertThat(supplier.get()).isInstanceOf(Long.class);
+        assertThat((Long) supplier.get()).isGreaterThan(0);
+    }
+
+    @Test
+    void hasExecutionTimeMetrics() {
+        RecordingMetricFactory metricFactory = new RecordingMetricFactory();
+        NoopGaugeRegistry gaugeRegistry = new NoopGaugeRegistry();
+        ActiveMQMetricCollector testee = new ActiveMQMetricCollectorImpl(connectionFactory, metricFactory, gaugeRegistry);
+        testee.start();
+        testee.collectBrokerStatistics();
+        testee.collectQueueStatistics(MailQueueName.of("UNKNOWN"));
+
+        Integer executionTimeCount = Flux.interval(ActiveMQMetricCollectorImpl.REFRESH_DELAY, Duration.ofSeconds(1))
+            .take(3,true)
+            .flatMap(n -> Mono.fromCallable(() -> metricFactory.executionTimesForPrefixName("ActiveMQ.").size()))
+            .blockLast();
+        assertThat(executionTimeCount).isNotNull().isNotZero();
+
+        testee.stop();
+    }
+
+    private class SimpleGaugeRegistry implements GaugeRegistry {
+        private final Map<String, Gauge<?>> gauges = new ConcurrentHashMap<>();
+
+        @Override
+        public <T> GaugeRegistry register(String name, Gauge<T> gauge) {
+            gauges.put(name, gauge);
+            return this;
+        }
+
+        public Gauge<?> getGauge(String name) {
+            return gauges.get(name);
+        }
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/08: [JAMES-3841] ActiveMQ: fix test for JamesAppSpringMainTest

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 83b5b660d5cc36f8e58ed7d3c38747ce84bb16f4
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Mon Nov 28 22:46:20 2022 +0100

    [JAMES-3841] ActiveMQ: fix test for JamesAppSpringMainTest
---
 .../src/main/resources/META-INF/spring/activemq-queue-context.xml      | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/server/queue/queue-activemq/src/main/resources/META-INF/spring/activemq-queue-context.xml b/server/queue/queue-activemq/src/main/resources/META-INF/spring/activemq-queue-context.xml
index b82e377b70..f7406da0bd 100644
--- a/server/queue/queue-activemq/src/main/resources/META-INF/spring/activemq-queue-context.xml
+++ b/server/queue/queue-activemq/src/main/resources/META-INF/spring/activemq-queue-context.xml
@@ -26,6 +26,7 @@
         <constructor-arg index="1" ref="rawMailQueueItemDecoratorFactory"/>
         <constructor-arg index="2" ref="metricFactory"/>
         <constructor-arg index="3" ref="gaugeRegistry"/>
+        <constructor-arg index="4" ref="activeMQMetricCollector"/>
     </bean>
 
     <bean id="rawMailQueueItemDecoratorFactory" class="org.apache.james.queue.api.RawMailQueueItemDecoratorFactory"/>
@@ -36,4 +37,6 @@
     </bean>
 
     <bean id="persistenceAdapter" class="org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter"/>
+
+    <bean id="activeMQMetricCollector" class="org.apache.james.queue.activemq.metric.ActiveMQMetricCollectorNoop"/>
 </beans>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 07/08: [JAMES-3841] ActiveMQ: add sample configuration for conf/activemq.properties

Posted by rc...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit fee5c9033879aaaef9bba6fcf58141d7b216e20a
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 30 07:48:16 2022 +0100

    [JAMES-3841] ActiveMQ: add sample configuration for conf/activemq.properties
---
 .../sample-configuration/activemq.properties           | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/server/apps/cassandra-app/sample-configuration/activemq.properties b/server/apps/cassandra-app/sample-configuration/activemq.properties
new file mode 100644
index 0000000000..99e3b68368
--- /dev/null
+++ b/server/apps/cassandra-app/sample-configuration/activemq.properties
@@ -0,0 +1,18 @@
+# Configuration file for ActiveMQ AQMP
+
+#
+# Metric collection (see https://activemq.apache.org/statisticsplugin.html)
+# - broker statistics
+# - per-queue statistics (e.g. spool, outgoing)
+#
+
+# collect ActiveMQ metrics (i.e. broker statistics, per-queue statistics)
+metrics.enabled=false
+# add start delay for metric collector (must be greater than 1s)
+metrics.start_delay=10s
+# collect metrics (must be greater than 5s)
+metrics.interval=30s
+# timeout (must be less than metrics.interval)
+metrics.timeout=2s
+# ActiveMQ receive timeout (must be less than metrics.timeout)
+metrics.aqmp_timeout=1s


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org