You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/07 12:18:44 UTC

[activemq-artemis] 06/07: ARTEMIS-2834 leaking meters

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit f5d571013333d92f657f620858f827a7402b25eb
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Jul 1 13:19:03 2020 -0500

    ARTEMIS-2834 leaking meters
    
    Move all of the meter registration code into the ManagementServer
    implementation to provide better life-cycle management.
---
 .../core/server/impl/ActiveMQServerImpl.java       | 27 +------
 .../artemis/core/server/impl/QueueImpl.java        | 46 +----------
 .../management/impl/ManagementServiceImpl.java     | 67 ++++++++++++++--
 .../core/server/metrics/MetricsManager.java        | 38 +++++----
 .../failover/SharedStoreMetricsLeakTest.java       | 89 ++++++++++++++++++++++
 5 files changed, 167 insertions(+), 100 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 41edad9..fe25fe1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -60,7 +60,6 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -154,9 +153,7 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
 import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
-import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
-import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@@ -165,6 +162,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
 import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
@@ -1173,7 +1171,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
 
       stopComponent(managementService);
-      unregisterMeters();
       stopComponent(resourceManager);
       stopComponent(postOffice);
 
@@ -2893,8 +2890,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository);
       }
 
-      registerMeters();
-
       postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
 
       // This can't be created until node id is set
@@ -3078,19 +3073,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       callActivationCompleteCallbacks();
    }
 
-   private void registerMeters() {
-      MetricsManager metricsManager = this.metricsManager; // volatile load
-      if (metricsManager != null) {
-         metricsManager.registerBrokerGauge(builder -> {
-            builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
-            builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
-            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
-            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
-            builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
-         });
-      }
-   }
-
    @Override
    public double getDiskStoreUsage() {
       //this should not happen but if it does, return -1 to highlight it is not working
@@ -3101,13 +3083,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return FileStoreMonitor.calculateUsage(getPagingManager().getDiskUsableSpace(), getPagingManager().getDiskTotalSpace());
    }
 
-   private void unregisterMeters() {
-      MetricsManager metricsManager = this.metricsManager; // volatile load
-      if (metricsManager != null) {
-         metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName());
-      }
-   }
-
    private void deploySecurityFromConfiguration() {
       for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) {
          securityRepository.addMatch(entry.getKey(), entry.getValue(), true);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d2e7d88..040e4a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -52,8 +52,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.PriorityAware;
 import org.apache.activemq.artemis.core.filter.Filter;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -63,8 +61,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
 import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
 import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -90,8 +88,6 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
 import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
-import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -689,7 +685,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.factory = factory;
 
-      registerMeters();
       if (this.addressInfo != null && this.addressInfo.isPaused()) {
          this.pause(false);
       }
@@ -2264,8 +2259,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             slowConsumerReaperFuture.cancel(false);
          }
 
-         unregisterMeters();
-
          tx.commit();
       } catch (Exception e) {
          tx.rollback();
@@ -4346,43 +4339,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
-   private void registerMeters() {
-      if (server != null && server.getMetricsManager() != null) {
-         String addressName = address.toString();
-         String queueName = name.toString();
-         MetricsManager metricsManager = server.getMetricsManager();
-
-         metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
-            builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
-            builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
-
-            builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
-            builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
-
-            builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
-            builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
-            builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
-
-            builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
-            builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
-            builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
-            builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
-            builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
-         });
-      }
-   }
-
-   private void unregisterMeters() {
-      if (server != null && server.getMetricsManager() != null) {
-         server.getMetricsManager().remove(ResourceNames.QUEUE + name);
-      }
-   }
-
    private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener {
 
       @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 207a2da..6e86677 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
 import org.apache.activemq.artemis.api.core.management.AcceptorControl;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
 import org.apache.activemq.artemis.api.core.management.BridgeControl;
@@ -54,6 +55,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
 import org.apache.activemq.artemis.api.core.management.DivertControl;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.BridgeConfiguration;
 import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@@ -95,7 +97,9 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
+import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
 import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
+import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -215,15 +219,30 @@ public class ManagementServiceImpl implements ManagementService {
       ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
       registerInJMX(objectName, messagingServerControl);
       registerInRegistry(ResourceNames.BROKER, messagingServerControl);
+      registerBrokerMeters();
 
       return messagingServerControl;
    }
 
+   private void registerBrokerMeters() {
+      MetricsManager metricsManager = messagingServer.getMetricsManager();
+      if (metricsManager != null) {
+         metricsManager.registerBrokerGauge(builder -> {
+            builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
+            builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
+            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
+            builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
+            builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
+         });
+      }
+   }
+
    @Override
    public synchronized void unregisterServer() throws Exception {
       ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.BROKER);
+      unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName());
    }
 
    @Override
@@ -260,7 +279,7 @@ public class ManagementServiceImpl implements ManagementService {
 
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.ADDRESS + address);
-      unregisterAddressMeters(address.toString());
+      unregisterMeters(ResourceNames.ADDRESS + address);
    }
 
    public synchronized void registerQueue(final Queue queue,
@@ -283,11 +302,13 @@ public class ManagementServiceImpl implements ManagementService {
       ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
       registerInJMX(objectName, queueControl);
       registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
+      registerQueueMeters(queue);
 
       if (logger.isDebugEnabled()) {
          logger.debug("registered queue " + objectName);
       }
    }
+
    @Override
    public synchronized void registerQueue(final Queue queue,
                                           final SimpleString address,
@@ -300,11 +321,47 @@ public class ManagementServiceImpl implements ManagementService {
       ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
       unregisterFromJMX(objectName);
       unregisterFromRegistry(ResourceNames.QUEUE + name);
+      unregisterMeters(ResourceNames.QUEUE + name);
       if (messageCounterManager != null) {
          messageCounterManager.unregisterMessageCounter(name.toString());
       }
    }
 
+   private void registerQueueMeters(final Queue queue) {
+      MetricsManager metricsManager = messagingServer.getMetricsManager();
+      if (metricsManager != null) {
+         metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
+            builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
+            builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
+
+            builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
+            builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
+
+            builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+            builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
+            builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
+
+            builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
+            builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
+            builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
+            builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
+            builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
+         });
+      }
+   }
+
+   private void unregisterMeters(final String name) {
+      MetricsManager metricsManager = messagingServer.getMetricsManager();
+      if (metricsManager != null) {
+         metricsManager.remove(name);
+      }
+   }
+
    @Override
    public synchronized void registerDivert(final Divert divert) throws Exception {
       ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString());
@@ -560,13 +617,6 @@ public class ManagementServiceImpl implements ManagementService {
       }
    }
 
-   public void unregisterAddressMeters(String address) {
-      MetricsManager metricsManager = messagingServer.getMetricsManager();
-      if (metricsManager != null) {
-         metricsManager.remove(ResourceNames.ADDRESS + address);
-      }
-   }
-
    @Override
    public void addNotificationListener(final NotificationListener listener) {
       listeners.add(listener);
@@ -622,6 +672,7 @@ public class ManagementServiceImpl implements ManagementService {
 
       for (String resourceName : resourceNames) {
          unregisterFromRegistry(resourceName);
+         unregisterMeters(resourceName);
       }
 
       if (jmxManagementEnabled) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
index fb177bc..8fab39c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
@@ -33,12 +33,14 @@ import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
 import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.core.config.MetricsConfiguration;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.jboss.logging.Logger;
 
 public class MetricsManager {
 
+   private static final Logger log = Logger.getLogger(MetricsManager.class);
+
    private final String brokerName;
 
    private final MeterRegistry meterRegistry;
@@ -91,14 +93,7 @@ public class MetricsManager {
          newMeters.add(meter);
       });
       final String resource = ResourceNames.QUEUE + queue;
-      this.meters.compute(resource, (s, meters) -> {
-         //the old meters are ignored on purpose
-         meters = new ArrayList<>(newMeters.size());
-         for (Gauge.Builder gauge : newMeters) {
-            meters.add(gauge.register(meterRegistry));
-         }
-         return meters;
-      });
+      registerMeter(newMeters, resource);
    }
 
    public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
@@ -116,14 +111,7 @@ public class MetricsManager {
          newMeters.add(meter);
       });
       final String resource = ResourceNames.ADDRESS + address;
-      this.meters.compute(resource, (s, meters) -> {
-         //the old meters are ignored on purpose
-         meters = new ArrayList<>(newMeters.size());
-         for (Gauge.Builder gauge : newMeters) {
-            meters.add(gauge.register(meterRegistry));
-         }
-         return meters;
-      });
+      registerMeter(newMeters, resource);
    }
 
    public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
@@ -140,11 +128,19 @@ public class MetricsManager {
          newMeters.add(meter);
       });
       final String resource = ResourceNames.BROKER + "." + brokerName;
+      registerMeter(newMeters, resource);
+   }
+
+   private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
       this.meters.compute(resource, (s, meters) -> {
          //the old meters are ignored on purpose
          meters = new ArrayList<>(newMeters.size());
-         for (Gauge.Builder gauge : newMeters) {
-            meters.add(gauge.register(meterRegistry));
+         for (Gauge.Builder gaugeBuilder : newMeters) {
+            Gauge gauge = gaugeBuilder.register(meterRegistry);
+            meters.add(gauge);
+            if (log.isDebugEnabled()) {
+               log.debug("Registered meter: " + gauge.getId());
+            }
          }
          return meters;
       });
@@ -157,8 +153,8 @@ public class MetricsManager {
          }
          for (Meter meter : meters) {
             Meter removed = meterRegistry.remove(meter);
-            if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-               ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId());
+            if (log.isDebugEnabled()) {
+               log.debug("Unregistered meter: " + removed.getId());
             }
          }
          return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java
new file mode 100755
index 0000000..43c904b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.config.MetricsConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SharedStoreMetricsLeakTest extends ClusterTestBase {
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      setupServers();
+   }
+
+   private void setupServers() throws Exception {
+      setupLiveServer(0, isFileStorage(), true, isNetty(), false);
+      setupBackupServer(1, 0, isFileStorage(), true, isNetty());
+
+      getServer(0).getConfiguration().setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true));
+      getServer(0).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
+      getServer(1).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailoverOnServerShutdown(true).setAllowFailBack(true));
+      getServer(1).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
+
+      // configure cluster for bother servers
+      setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+      setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+   }
+
+   private boolean isNetty() {
+      return true;
+   }
+
+   @Test
+   public void testForMeterLeaks() throws Exception {
+      ActiveMQServer live = getServer(0);
+      ActiveMQServer backup = getServer(1);
+
+      live.start();
+      assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
+
+      backup.start();
+      assertFalse(backup.waitForActivation(1, TimeUnit.SECONDS));
+
+      // there should be a handful of metrics available from the ActiveMQServerImpl itself
+      long baseline = backup.getMetricsManager().getMeterRegistry().getMeters().size();
+
+      live.stop();
+      assertTrue(backup.waitForActivation(5, TimeUnit.SECONDS));
+
+      // after failover more meters should get registered
+      Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() > baseline, 2000, 100);
+
+      live.start();
+      assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
+
+      // after failback the number of registered meters should return to baseline
+      Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() == baseline, 2000, 100);
+
+      live.stop();
+      backup.stop();
+   }
+}