You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2022/10/31 13:04:27 UTC

[activemq-artemis] branch main updated: ARTEMIS-4064 Harden MetricsManager

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new ccb76c0a2a ARTEMIS-4064 Harden MetricsManager
ccb76c0a2a is described below

commit ccb76c0a2a1055e83ddfdbbda5049b97b2389b16
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Thu Oct 20 13:47:18 2022 -0500

    ARTEMIS-4064 Harden MetricsManager
    
    In order to improve trouble-shooting for the MetricsManager there should
    be additional logging and exceptions. In all, this commit contains the
    following changes:
    
     - Additional logging
     - Throw an exception when registering meters if meters already exist
     - Rename a few variables & methods to more clearly identify what they
       are used for
     - Upgrade Micrometer to 1.9.5
     - Simplify/clarify a few blocks of code
     - No longer pass the ManagementServiceImpl when registering the
       metrics, but instead pass the Object the meter is observing (e.g.
       broker, address, or queue)
---
 .../artemis/core/server/ActiveMQMessageBundle.java |  3 +
 .../management/impl/ManagementServiceImpl.java     | 58 ++++++++---------
 .../core/server/metrics/MetricsManager.java        | 72 ++++++++++------------
 pom.xml                                            |  2 +-
 4 files changed, 67 insertions(+), 68 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 9ca0a1127d..aadb9ce633 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -521,4 +521,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 229243, value = "Embedded web server restart failed")
    ActiveMQException embeddedWebServerRestartFailed(Exception e);
+
+   @Message(id = 229244, value = "Meters already registered for {}")
+   IllegalStateException metersAlreadyRegistered(String resource);
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index d7c3a3342f..d2f6dbee3a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -227,11 +227,11 @@ public class ManagementServiceImpl implements ManagementService {
       MetricsManager metricsManager = messagingServer.getMetricsManager();
       if (metricsManager != null) {
          metricsManager.registerBrokerGauge(builder -> {
-            builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
-            builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
-            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
-            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
-            builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
+            builder.build(BrokerMetricNames.CONNECTION_COUNT, messagingServer, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
+            builder.build(BrokerMetricNames.TOTAL_CONNECTION_COUNT, messagingServer, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
+            builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE, messagingServer, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
+            builder.build(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, messagingServer, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
+            builder.build(BrokerMetricNames.DISK_STORE_USAGE, messagingServer, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
          });
       }
    }
@@ -266,10 +266,10 @@ public class ManagementServiceImpl implements ManagementService {
          MetricsManager metricsManager = messagingServer.getMetricsManager();
          if (metricsManager != null) {
             metricsManager.registerAddressGauge(addressInfo.getName().toString(), builder -> {
-               builder.register(AddressMetricNames.ROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(AddressMetricNames.UNROUTED_MESSAGE_COUNT, this, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(AddressMetricNames.ADDRESS_SIZE, this, metrics -> Double.valueOf(addressControl.getAddressSize()), AddressControl.ADDRESS_SIZE_DESCRIPTION);
-               builder.register(AddressMetricNames.PAGES_COUNT, this, metrics -> Double.valueOf(addressControl.getNumberOfPages()), AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
+               builder.build(AddressMetricNames.ROUTED_MESSAGE_COUNT, addressInfo, metrics -> Double.valueOf(addressInfo.getRoutedMessageCount()), AddressControl.ROUTED_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(AddressMetricNames.UNROUTED_MESSAGE_COUNT, addressInfo, metrics -> Double.valueOf(addressInfo.getUnRoutedMessageCount()), AddressControl.UNROUTED_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(AddressMetricNames.ADDRESS_SIZE, addressInfo, metrics -> Double.valueOf(addressControl.getAddressSize()), AddressControl.ADDRESS_SIZE_DESCRIPTION);
+               builder.build(AddressMetricNames.PAGES_COUNT, addressInfo, metrics -> Double.valueOf(addressControl.getNumberOfPages()), AddressControl.NUMBER_OF_PAGES_DESCRIPTION);
             });
          }
       }
@@ -330,26 +330,26 @@ public class ManagementServiceImpl implements ManagementService {
          MetricsManager metricsManager = messagingServer.getMetricsManager();
          if (metricsManager != null) {
             metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
-               builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
-               builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
-
-               builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
-               builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
-
-               builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
-               builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
-               builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
-
-               builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
-               builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
-               builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
-               builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
-               builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
+               builder.build(QueueMetricNames.DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
+
+               builder.build(QueueMetricNames.DELIVERING_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
+               builder.build(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
+
+               builder.build(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, queue, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+               builder.build(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
+               builder.build(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, queue, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
+
+               builder.build(QueueMetricNames.MESSAGES_ACKNOWLEDGED, queue, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
+               builder.build(QueueMetricNames.MESSAGES_ADDED, queue, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
+               builder.build(QueueMetricNames.MESSAGES_KILLED, queue, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
+               builder.build(QueueMetricNames.MESSAGES_EXPIRED, queue, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
+               builder.build(QueueMetricNames.CONSUMER_COUNT, queue, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
             });
          }
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
index d9d19984e8..e5c84d48df 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
@@ -34,6 +34,7 @@ import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
 import io.netty.buffer.PooledByteBufAllocator;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.MetricsConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.slf4j.Logger;
@@ -80,15 +81,14 @@ public class MetricsManager {
    @FunctionalInterface
    public interface MetricGaugeBuilder {
 
-      void register(String metricName, Object state, ToDoubleFunction f, String description);
+      void build(String metricName, Object state, ToDoubleFunction f, String description);
    }
 
    public void registerQueueGauge(String address, String queue, Consumer<MetricGaugeBuilder> builder) {
-      final MeterRegistry meterRegistry = this.meterRegistry;
-      if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+      if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
          return;
       }
-      final List<Gauge.Builder> newMeters = new ArrayList<>();
+      final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
       builder.accept((metricName, state, f, description) -> {
          Gauge.Builder meter = Gauge
             .builder("artemis." + metricName, state, f)
@@ -96,70 +96,66 @@ public class MetricsManager {
             .tag("address", address)
             .tag("queue", queue)
             .description(description);
-         newMeters.add(meter);
+         gaugeBuilders.add(meter);
       });
-      final String resource = ResourceNames.QUEUE + queue;
-      registerMeter(newMeters, resource);
+      registerMeters(gaugeBuilders, ResourceNames.QUEUE + queue);
    }
 
    public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
-      final MeterRegistry meterRegistry = this.meterRegistry;
-      if (meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
+      if (this.meterRegistry == null || !addressSettingsRepository.getMatch(address).isEnableMetrics()) {
          return;
       }
-      final List<Gauge.Builder> newMeters = new ArrayList<>();
+      final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
       builder.accept((metricName, state, f, description) -> {
          Gauge.Builder meter = Gauge
             .builder("artemis." + metricName, state, f)
             .tag("broker", brokerName)
             .tag("address", address)
             .description(description);
-         newMeters.add(meter);
+         gaugeBuilders.add(meter);
       });
-      final String resource = ResourceNames.ADDRESS + address;
-      registerMeter(newMeters, resource);
+      registerMeters(gaugeBuilders, ResourceNames.ADDRESS + address);
    }
 
    public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
-      final MeterRegistry meterRegistry = this.meterRegistry;
-      if (meterRegistry == null) {
+      if (this.meterRegistry == null) {
          return;
       }
-      final List<Gauge.Builder> newMeters = new ArrayList<>();
+      final List<Gauge.Builder> gaugeBuilders = new ArrayList<>();
       builder.accept((metricName, state, f, description) -> {
          Gauge.Builder meter = Gauge
             .builder("artemis." + metricName, state, f)
             .tag("broker", brokerName)
             .description(description);
-         newMeters.add(meter);
+         gaugeBuilders.add(meter);
       });
-      final String resource = ResourceNames.BROKER + "." + brokerName;
-      registerMeter(newMeters, resource);
+      registerMeters(gaugeBuilders, ResourceNames.BROKER + "." + brokerName);
    }
 
-   private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
-      this.meters.compute(resource, (s, meters) -> {
-         //the old meters are ignored on purpose
-         meters = new ArrayList<>(newMeters.size());
-         for (Gauge.Builder gaugeBuilder : newMeters) {
-            Gauge gauge = gaugeBuilder.register(meterRegistry);
-            meters.add(gauge);
-            logger.debug("Registered meter: {}", gauge.getId());
-         }
-         return meters;
-      });
+   private void registerMeters(List<Gauge.Builder> gaugeBuilders, String resource) {
+      if (meters.get(resource) != null) {
+         throw ActiveMQMessageBundle.BUNDLE.metersAlreadyRegistered(resource);
+      }
+      logger.debug("Registering meters for {}", resource);
+      List<Meter> newMeters = new ArrayList<>(gaugeBuilders.size());
+      for (Gauge.Builder gaugeBuilder : gaugeBuilders) {
+         Gauge gauge = gaugeBuilder.register(meterRegistry);
+         newMeters.add(gauge);
+         logger.debug("Registered meter: {}", gauge.getId());
+      }
+      meters.put(resource, newMeters);
    }
 
-   public void remove(String component) {
-      meters.computeIfPresent(component, (s, meters) -> {
-         if (meters == null) {
-            return null;
-         }
-         for (Meter meter : meters) {
+   public void remove(String resource) {
+      List<Meter> resourceMeters = meters.remove(resource);
+      if (resourceMeters != null) {
+         logger.debug("Unregistering meters for {}", resource);
+         for (Meter meter : resourceMeters) {
             Meter removed = meterRegistry.remove(meter);
             logger.debug("Unregistered meter: {}", removed.getId());
          }
-         return null;
-      });
+      } else {
+         logger.debug("Attempted to unregister meters for {}, but none were found.", resource);
+      }
    }
 }
diff --git a/pom.xml b/pom.xml
index 147d820bae..5ecce22dd0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,7 +147,7 @@
       <servicemix.json-1.1.spec.version>2.9.0</servicemix.json-1.1.spec.version>
       <version.org.jacoco>0.8.8</version.org.jacoco>
       <version.org.jacoco.plugin>0.8.8</version.org.jacoco.plugin>
-      <version.micrometer>1.8.5</version.micrometer>
+      <version.micrometer>1.9.5</version.micrometer>
       <hamcrest.version>2.1</hamcrest.version>
       <junit.version>4.13.2</junit.version>
       <surefire.version>2.22.2</surefire.version>