You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/12/01 05:08:54 UTC
[james-project] 01/08: [JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 80d12a2edc40596b3dc1c6c2961ac64e443e036a
Author: ouvtam <ou...@8n4.pw>
AuthorDate: Wed Nov 2 23:55:04 2022 +0100
[JAMES-3841] ActiveMQ: replace simple connection health check with retrieving ActiveMQ broker stats
---
.../queue/activemq/ActiveMQBrokerStatistics.java | 229 +++++++++++++++++++++
.../james/queue/activemq/ActiveMQHealthCheck.java | 95 ++++++++-
.../james/queue/activemq/EmbeddedActiveMQ.java | 1 +
.../queue/activemq/ActiveMQHealthCheckTest.java | 6 +-
4 files changed, 325 insertions(+), 6 deletions(-)
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
new file mode 100644
index 0000000000..e64ff380d9
--- /dev/null
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQBrokerStatistics.java
@@ -0,0 +1,229 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.queue.activemq;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.james.metrics.api.GaugeRegistry;
+
+import com.google.common.util.concurrent.AtomicDouble;
+
+public class ActiveMQBrokerStatistics {
+
+ public static final String STATS_QUEUE_NAME = "ActiveMQ.Statistics.Broker";
+
+ private static final String TEMP_PERCENT_USAGE = "tempPercentUsage";
+ private static final String STORE_PERCENT_USAGE = "storePercentUsage";
+ private static final String MEMORY_PERCENT_USAGE = "memoryPercentUsage";
+
+ private static final String STORE_LIMIT = "storeLimit";
+ private static final String MEMORY_LIMIT = "memoryLimit";
+ private static final String TEMP_LIMIT = "tempLimit";
+
+ private static final String MEMORY_USAGE = "memoryUsage";
+ private static final String STORE_USAGE = "storeUsage";
+ private static final String TEMP_USAGE = "tempUsage";
+
+ private static final String SIZE = "size";
+ private static final String ENQUEUE_COUNT = "enqueueCount";
+ private static final String DEQUEUE_COUNT = "dequeueCount";
+ private static final String INFLIGHT_COUNT = "inflightCount";
+ private static final String PRODUCER_COUNT = "producerCount";
+ private static final String CONSUMER_COUNT = "consumerCount";
+ private static final String EXPIRED_COUNT = "expiredCount";
+ private static final String DISPATCH_COUNT = "dispatchCount";
+ private static final String MESSAGES_CACHED = "messagesCached";
+
+ private static final String MIN_ENQUEUE_TIME = "minEnqueueTime";
+ private static final String MAX_ENQUEUE_TIME = "maxEnqueueTime";
+ private static final String AVERAGE_ENQUEUE_TIME = "averageEnqueueTime";
+
+ private final AtomicLong lastUpdate = new AtomicLong();
+
+ private final AtomicInteger memoryPercentUsage = new AtomicInteger();
+ private final AtomicInteger storePercentUsage = new AtomicInteger();
+ private final AtomicInteger tempPercentUsage = new AtomicInteger();
+
+ private final AtomicLong memoryLimit = new AtomicLong();
+ private final AtomicLong storeLimit = new AtomicLong();
+ private final AtomicLong tempLimit = new AtomicLong();
+
+ private final AtomicLong memoryUsage = new AtomicLong();
+ private final AtomicLong storeUsage = new AtomicLong();
+ private final AtomicLong tempUsage = new AtomicLong();
+
+ private final AtomicLong size = new AtomicLong();
+ private final AtomicLong enqueueCount = new AtomicLong();
+ private final AtomicLong dequeueCount = new AtomicLong();
+ private final AtomicLong inflightCount = new AtomicLong();
+ private final AtomicLong producerCount = new AtomicLong();
+ private final AtomicLong consumerCount = new AtomicLong();
+ private final AtomicLong expiredCount = new AtomicLong();
+ private final AtomicLong dispatchCount = new AtomicLong();
+ private final AtomicLong messagesCached = new AtomicLong();
+
+ private final AtomicDouble minEnqueueTime = new AtomicDouble();
+ private final AtomicDouble maxEnqueueTime = new AtomicDouble();
+ private final AtomicDouble averageEnqueueTime = new AtomicDouble();
+
+ public ActiveMQBrokerStatistics(GaugeRegistry gaugeRegistry) {
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_PERCENT_USAGE, memoryPercentUsage::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_PERCENT_USAGE, storePercentUsage::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_PERCENT_USAGE, tempPercentUsage::get);
+
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_LIMIT, memoryLimit::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_LIMIT, storeLimit::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_LIMIT, tempLimit::get);
+
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MEMORY_USAGE, memoryUsage::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + STORE_USAGE, storeUsage::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + TEMP_USAGE, tempUsage::get);
+
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + SIZE, size::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + ENQUEUE_COUNT, enqueueCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + DEQUEUE_COUNT, dequeueCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + INFLIGHT_COUNT, inflightCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + PRODUCER_COUNT, producerCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + CONSUMER_COUNT, consumerCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + EXPIRED_COUNT, expiredCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + DISPATCH_COUNT, dispatchCount::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MESSAGES_CACHED, messagesCached::get);
+
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MIN_ENQUEUE_TIME, minEnqueueTime::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + MAX_ENQUEUE_TIME, maxEnqueueTime::get);
+ gaugeRegistry.register(STATS_QUEUE_NAME + "." + AVERAGE_ENQUEUE_TIME, averageEnqueueTime::get);
+
+ gaugeRegistry.register(STATS_QUEUE_NAME + ".lastUpdate", lastUpdate::get);
+ }
+
+ public void update(MapMessage msg) throws JMSException {
+ if (msg.itemExists(MEMORY_PERCENT_USAGE)) {
+ memoryPercentUsage.set(msg.getInt(MEMORY_PERCENT_USAGE));
+ }
+ if (msg.itemExists(STORE_PERCENT_USAGE)) {
+ storePercentUsage.set(msg.getInt(STORE_PERCENT_USAGE));
+ }
+ if (msg.itemExists(TEMP_PERCENT_USAGE)) {
+ tempPercentUsage.set(msg.getInt(TEMP_PERCENT_USAGE));
+ }
+
+ if (msg.itemExists(MEMORY_LIMIT)) {
+ memoryLimit.set(msg.getLong(MEMORY_LIMIT));
+ }
+ if (msg.itemExists(STORE_LIMIT)) {
+ storeLimit.set(msg.getLong(STORE_LIMIT));
+ }
+ if (msg.itemExists(TEMP_LIMIT)) {
+ tempLimit.set(msg.getLong(TEMP_LIMIT));
+ }
+
+ if (msg.itemExists(MEMORY_USAGE)) {
+ memoryUsage.set(msg.getLong(MEMORY_USAGE));
+ }
+ if (msg.itemExists(STORE_USAGE)) {
+ storeUsage.set(msg.getLong(STORE_USAGE));
+ }
+ if (msg.itemExists(TEMP_USAGE)) {
+ tempUsage.set(msg.getLong(TEMP_USAGE));
+ }
+
+ if (msg.itemExists(SIZE)) {
+ size.set(msg.getLong(SIZE));
+ }
+ if (msg.itemExists(ENQUEUE_COUNT)) {
+ enqueueCount.set(msg.getLong(ENQUEUE_COUNT));
+ }
+ if (msg.itemExists(DEQUEUE_COUNT)) {
+ dequeueCount.set(msg.getLong(DEQUEUE_COUNT));
+ }
+ if (msg.itemExists(INFLIGHT_COUNT)) {
+ inflightCount.set(msg.getLong(INFLIGHT_COUNT));
+ }
+ if (msg.itemExists(PRODUCER_COUNT)) {
+ producerCount.set(msg.getLong(PRODUCER_COUNT));
+ }
+ if (msg.itemExists(CONSUMER_COUNT)) {
+ consumerCount.set(msg.getLong(CONSUMER_COUNT));
+ }
+ if (msg.itemExists(EXPIRED_COUNT)) {
+ expiredCount.set(msg.getLong(EXPIRED_COUNT));
+ }
+ if (msg.itemExists(DISPATCH_COUNT)) {
+ dispatchCount.set(msg.getLong(DISPATCH_COUNT));
+ }
+ if (msg.itemExists(MESSAGES_CACHED)) {
+ messagesCached.set(msg.getLong(MESSAGES_CACHED));
+ }
+
+ if (msg.itemExists(MIN_ENQUEUE_TIME)) {
+ minEnqueueTime.set(msg.getDouble(MIN_ENQUEUE_TIME));
+ }
+ if (msg.itemExists(MAX_ENQUEUE_TIME)) {
+ maxEnqueueTime.set(msg.getDouble(MAX_ENQUEUE_TIME));
+ }
+ if (msg.itemExists(AVERAGE_ENQUEUE_TIME)) {
+ averageEnqueueTime.set(msg.getDouble(AVERAGE_ENQUEUE_TIME));
+ }
+
+ lastUpdate.set(System.currentTimeMillis());
+ }
+
+ public long getLastUpdate() {
+ return lastUpdate.get();
+ }
+
+ /*
+ vm=vm://localhost
+ - memoryUsage=0
+ - storeUsage=3330
+ - tempPercentUsage=0
+ ssl=
+ openwire=tcp://localhost:50059
+ brokerId=ID:bigmac-50057-1253605065511-0:0
+ - consumerCount=2
+ brokerName=localhost
+ - expiredCount=0
+ - dispatchCount=1
+ - maxEnqueueTime=5.0
+ - storePercentUsage=0
+ - dequeueCount=0
+ - inflightCount=1
+ - messagesCached=0
+ - tempLimit=107374182400
+ - averageEnqueueTime=5.0
+ stomp+ssl=
+ - memoryPercentUsage=0
+ - size=10
+ - tempUsage=0
+ - producerCount=1
+ - minEnqueueTime=5.0
+ dataDirectory=/Users/rajdavies/dev/projects/activemq/activemq-core/activemq-data
+ - enqueueCount=10
+ stomp=
+ - storeLimit=107374182400
+ - memoryLimit=67108864
+ */
+
+
+}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
index 9032496e8f..07eb137794 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/ActiveMQHealthCheck.java
@@ -19,14 +19,24 @@
package org.apache.james.queue.activemq;
+import java.time.Duration;
+
import javax.inject.Inject;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
import javax.jms.Session;
+import javax.jms.TemporaryQueue;
import org.apache.james.core.healthcheck.ComponentName;
import org.apache.james.core.healthcheck.HealthCheck;
import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.GaugeRegistry;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,11 +47,14 @@ import reactor.core.scheduler.Schedulers;
public class ActiveMQHealthCheck implements HealthCheck {
public static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQHealthCheck.class);
public static final ComponentName COMPONENT_NAME = new ComponentName("Embedded ActiveMQ");
+ private static final Duration RECEIVE_TIMEOUT = Duration.ofSeconds(1);
private final ConnectionFactory connectionFactory;
+ private final ActiveMQBrokerStatistics brokerStatistics;
@Inject
- public ActiveMQHealthCheck(ConnectionFactory connectionFactory) {
+ public ActiveMQHealthCheck(ConnectionFactory connectionFactory, GaugeRegistry gaugeRegistry) {
this.connectionFactory = connectionFactory;
+ this.brokerStatistics = new ActiveMQBrokerStatistics(gaugeRegistry);
}
@Override
@@ -53,15 +66,87 @@ public class ActiveMQHealthCheck implements HealthCheck {
public Publisher<Result> check() {
return Mono.fromCallable(() -> {
try {
- Connection connection = connectionFactory.createConnection();
- Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
- session.close();
+ retrieveAndUpdateBrokerStatistics();
return Result.healthy(COMPONENT_NAME);
} catch (Exception e) {
LOGGER.warn("{} is unhealthy. {}", COMPONENT_NAME.getName(), e.getMessage());
- return Result.unhealthy(COMPONENT_NAME, e.getMessage());
+ return Result.unhealthy(COMPONENT_NAME, e.toString(), e);
}
}).subscribeOn(Schedulers.boundedElastic());
}
+
+ private void retrieveAndUpdateBrokerStatistics() throws JMSException {
+ Connection connection = null;
+ Session session = null;
+ TemporaryQueue replyTo = null;
+ MessageConsumer consumer = null;
+ MessageProducer producer = null;
+ try {
+ connection = connectionFactory.createConnection();
+ connection.start();
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ replyTo = session.createTemporaryQueue();
+ consumer = session.createConsumer(replyTo);
+
+ Queue testQueue = session.createQueue(ActiveMQBrokerStatistics.STATS_QUEUE_NAME);
+ producer = session.createProducer(testQueue);
+ Message msg = session.createMessage();
+ msg.setJMSReplyTo(replyTo);
+ producer.send(msg);
+
+ Message reply = consumer.receive(RECEIVE_TIMEOUT.toMillis());
+ if (reply == null) {
+ throw new JMSException("no message received, timed out after " + RECEIVE_TIMEOUT);
+ } else if (!(reply instanceof MapMessage)) {
+ throw new JMSException("expected MapMessage but got " + reply.getClass());
+ }
+ brokerStatistics.update((MapMessage)reply);
+ } finally {
+ if (producer != null) {
+ try {
+ producer.close();
+ } catch (JMSException e) {
+ LOGGER.warn("error while closing producer", e);
+ }
+ }
+
+ if (consumer != null) {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ LOGGER.warn("error while closing consumer", e);
+ }
+ }
+
+ if (replyTo != null) {
+ try {
+ // we need to delete the temporary queue to be sure we will
+ // free up memory if thats not done and a pool is used
+ // its possible that we will register a new mbean in jmx for
+ // every TemporaryQueue which will never get unregistered
+ replyTo.delete();
+ } catch (JMSException e) {
+ LOGGER.warn("error while deleting temporary queue", e);
+ }
+ }
+
+ if (session != null) {
+ try {
+ session.close();
+ } catch (JMSException e) {
+ LOGGER.warn("error while closing session", e);
+ }
+ }
+
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (JMSException e) {
+ LOGGER.warn("error while closing connection", e);
+ }
+ }
+ }
+ }
}
diff --git a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
index ffe22248f5..b0faadfa53 100644
--- a/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
+++ b/server/queue/queue-activemq/src/main/java/org/apache/james/queue/activemq/EmbeddedActiveMQ.java
@@ -109,6 +109,7 @@ public class EmbeddedActiveMQ {
brokerService.setPersistenceAdapter(persistenceAdapter);
BrokerPlugin[] brokerPlugins = {new StatisticsBrokerPlugin()};
brokerService.setPlugins(brokerPlugins);
+ brokerService.setEnableStatistics(true);
String[] transportConnectorsURIs = {BROCKER_URI};
brokerService.setTransportConnectorURIs(transportConnectorsURIs);
brokerService.start();
diff --git a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
index f54851a487..aef9d9a499 100644
--- a/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
+++ b/server/queue/queue-activemq/src/test/java/org/apache/james/queue/activemq/ActiveMQHealthCheckTest.java
@@ -26,15 +26,19 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.james.core.healthcheck.Result;
+import org.apache.james.metrics.api.NoopGaugeRegistry;
import org.apache.james.queue.jms.BrokerExtension;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import reactor.core.publisher.Mono;
+@Tag(BrokerExtension.STATISTICS)
@ExtendWith(BrokerExtension.class)
class ActiveMQHealthCheckTest {
+
private ActiveMQHealthCheck testee;
private BrokerService broker;
@@ -46,7 +50,7 @@ class ActiveMQHealthCheckTest {
prefetchPolicy.setQueuePrefetch(0);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
- testee = new ActiveMQHealthCheck(connectionFactory);
+ testee = new ActiveMQHealthCheck(connectionFactory, new NoopGaugeRegistry());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org