You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/31 13:04:27 UTC
[activemq-artemis] branch main updated: ARTEMIS-4064 Harden MetricsManager
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new ccb76c0a2a ARTEMIS-4064 Harden MetricsManager
ccb76c0a2a is described below
commit ccb76c0a2a1055e83ddfdbbda5049b97b2389b16
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Oct 20 13:47:18 2022 -0500
ARTEMIS-4064 Harden MetricsManager
In order to improve trouble-shooting for the MetricsManager there should
be additional logging and exceptions. In all, this commit contains the
following changes:
- Additional logging
- Throw an exception when registering meters if meters already exist
- Rename a few variables & methods to more clearly identify what they
are used for
- Upgrade Micrometer to 1.9.5
- Simplify/clarify a few blocks of code
- No longer pass the ManagementServiceImpl when registering the
metrics, but instead pass the Object the meter is observing (e.g.
broker, address, or queue)
---
.../artemis/core/server/ActiveMQMessageBundle.java | 3 +
.../management/impl/ManagementServiceImpl.java | 58 ++++++++---------
.../core/server/metrics/MetricsManager.java | 72 ++++++++++------------
pom.xml | 2 +-
4 files changed, 67 insertions(+), 68 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 9ca0a1127d..aadb9ce633 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -521,4 +521,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229243, value = "Embedded web server restart failed")
ActiveMQException embeddedWebServerRestartFailed(Exception e);
+
+ @Message(id = 229244, value = "Meters already registered for {}")
+ IllegalStateException metersAlreadyRegistered(String resource);
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index d7c3a3342f..d2f6dbee3a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -227,11 +227,11 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerBrokerGauge(builder -> {
- builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
- builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
- builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.CONNECTION_COUNT, messagingServer, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
+ builder.build(BrokerMetricNames.TOTAL_CONNECTION_COUNT, messagingServer, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
+ builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE, messagingServer, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, messagingServer, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
+ builder.build(BrokerMetricNames.DISK_STORE_USAGE, messagingServer, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
});
}
}
@@ -266,10 +266,10 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerAddressGauge(addressInfo.getName().toString(), builder -> {
- builder.register(AddressMetricNames.ROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(AddressMetricNames.UNROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(AddressMetricNames.ADDRESS_SIZE, this, metrics -> Double.valueOf(addressControl.getAddressSize()), AddressControl.ADDRESS_SIZE_DESCRIPTION);
- builder.register(AddressMetricNames.PAGES_COUNT, this, metrics -> Double.valueOf(addressControl.getNumberOfPages()), AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
+ builder.build(AddressMetricNames.ROUTED_MESSAGE_COUNT, addressInfo, metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(AddressMetricNames.UNROUTED_MESSAGE_COUNT, addressInfo, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(AddressMetricNames.ADDRESS_SIZE, addressInfo, metrics -> Double.valueOf(addressControl.getAddressSize()), AddressControl.ADDRESS_SIZE_DESCRIPTION);
+ builder.build(AddressMetricNames.PAGES_COUNT, addressInfo, metrics -> Double.valueOf(addressControl.getNumberOfPages()), AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
});
}
}
@@ -330,26 +330,26 @@ public class ManagementServiceImpl implements ManagementService {
MetricsManager metricsManager = messagingServer.getMetricsManager();
if (metricsManager != null) {
metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
- builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
- builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
+ builder.build(QueueMetricNames.DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.DELIVERING_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
+ builder.build(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.build(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
+ builder.build(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
+
+ builder.build(QueueMetricNames.MESSAGES_ACKNOWLEDGED, queue, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_ADDED, queue, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_KILLED, queue, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
+ builder.build(QueueMetricNames.MESSAGES_EXPIRED, queue, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
+ builder.build(QueueMetricNames.CONSUMER_COUNT, queue, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
});
}
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
index d9d19984e8..e5c84d48df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
@@ -34,6 +34,7 @@ import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import io.netty.buffer.PooledByteBufAllocator;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.slf4j.Logger;
@@ -80,15 +81,14 @@ public class MetricsManager {
@FunctionalInterface
public interface MetricGaugeBuilder {
- void register(String metricName, Object state, ToDoubleFunction f, String description);
+ void build(String metricName, Object state, ToDoubleFunction f, String description);
}
public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+ if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
@@ -96,70 +96,66 @@ public class MetricsManager {
.tag("address", address)
.tag("queue", queue)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.QUEUE + queue;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.QUEUE + queue);
}
public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+ if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.tag("address", address)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.ADDRESS + address;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.ADDRESS + address);
}
public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
- final MeterRegistry meterRegistry = this.meterRegistry;
- if (meterRegistry == null) {
+ if (this.meterRegistry == null) {
return;
}
- final List<Gauge.Builder> newMeters = new ArrayList<>();
+ final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
builder.accept((metricName, state, f, description) -> {
Gauge.Builder meter = Gauge
.builder("artemis." + metricName, state, f)
.tag("broker", brokerName)
.description(description);
- newMeters.add(meter);
+ gaugeBuilders.add(meter);
});
- final String resource = ResourceNames.BROKER + "." + brokerName;
- registerMeter(newMeters, resource);
+ registerMeters(gaugeBuilders, ResourceNames.BROKER + "." + brokerName);
}
- private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
- this.meters.compute(resource, (s, meters) -> {
- //the old meters are ignored on purpose
- meters = new ArrayList<>(newMeters.size());
- for (Gauge.Builder gaugeBuilder : newMeters) {
- Gauge gauge = gaugeBuilder.register(meterRegistry);
- meters.add(gauge);
- logger.debug("Registered meter: {}", gauge.getId());
- }
- return meters;
- });
+ private void registerMeters(List<Gauge.Builder> gaugeBuilders, String resource) {
+ if (meters.get(resource) != null) {
+ throw ActiveMQMessageBundle.BUNDLE.metersAlreadyRegistered(resource);
+ }
+ logger.debug("Registering meters for {}", resource);
+ List<Meter> newMeters = new ArrayList<>(gaugeBuilders.size());
+ for (Gauge.Builder gaugeBuilder : gaugeBuilders) {
+ Gauge gauge = gaugeBuilder.register(meterRegistry);
+ newMeters.add(gauge);
+ logger.debug("Registered meter: {}", gauge.getId());
+ }
+ meters.put(resource, newMeters);
}
- public void remove(String component) {
- meters.computeIfPresent(component, (s, meters) -> {
- if (meters == null) {
- return null;
- }
- for (Meter meter : meters) {
+ public void remove(String resource) {
+ List<Meter> resourceMeters = meters.remove(resource);
+ if (resourceMeters != null) {
+ logger.debug("Unregistering meters for {}", resource);
+ for (Meter meter : resourceMeters) {
Meter removed = meterRegistry.remove(meter);
logger.debug("Unregistered meter: {}", removed.getId());
}
- return null;
- });
+ } else {
+ logger.debug("Attempted to unregister meters for {}, but none were found.", resource);
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 147d820bae..5ecce22dd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,7 +147,7 @@
<servicemix.json-1.1.spec.version>2.9.0</servicemix.json-1.1.spec.version>
<version.org.jacoco>0.8.8</version.org.jacoco>
<version.org.jacoco.plugin>0.8.8</version.org.jacoco.plugin>
- <version.micrometer>1.8.5</version.micrometer>
+ <version.micrometer>1.9.5</version.micrometer>
<hamcrest.version>2.1</hamcrest.version>
<junit.version>4.13.2</junit.version>
<surefire.version>2.22.2</surefire.version>