You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2020/07/07 12:18:44 UTC
[activemq-artemis] 06/07: ARTEMIS-2834 leaking meters
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit f5d571013333d92f657f620858f827a7402b25eb
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Wed Jul 1 13:19:03 2020 -0500
ARTEMIS-2834 leaking meters
Move all of the meter registration code into the ManagementServer
implementation to provide better life-cycle management.
---
.../core/server/impl/ActiveMQServerImpl.java | 27 +------
.../artemis/core/server/impl/QueueImpl.java | 46 +----------
.../management/impl/ManagementServiceImpl.java | 67 ++++++++++++++--
.../core/server/metrics/MetricsManager.java | 38 +++++----
.../failover/SharedStoreMetricsLeakTest.java | 89 ++++++++++++++++++++++
5 files changed, 167 insertions(+), 100 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 41edad9..fe25fe1 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -60,7 +60,6 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@@ -154,9 +153,7 @@ import org.apache.activemq.artemis.core.server.group.impl.RemoteGroupingHandler;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.impl.ManagementServiceImpl;
-import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
-import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQPluginRunnable;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBasePlugin;
@@ -165,6 +162,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBridgePlugin
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConnectionPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerConsumerPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerCriticalPlugin;
+import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
@@ -1173,7 +1171,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
stopComponent(managementService);
- unregisterMeters();
stopComponent(resourceManager);
stopComponent(postOffice);
@@ -2893,8 +2890,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
metricsManager = new MetricsManager(configuration.getName(), configuration.getMetricsConfiguration(), addressSettingsRepository);
}
- registerMeters();
-
postOffice = new PostOfficeImpl(this, storageManager, pagingManager, queueFactory, managementService, configuration.getMessageExpiryScanPeriod(), configuration.getAddressQueueScanPeriod(), configuration.getWildcardConfiguration(), configuration.getIDCacheSize(), configuration.isPersistIDCache(), addressSettingsRepository);
// This can't be created until node id is set
@@ -3078,19 +3073,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
callActivationCompleteCallbacks();
}
- private void registerMeters() {
- MetricsManager metricsManager = this.metricsManager; // volatile load
- if (metricsManager != null) {
- metricsManager.registerBrokerGauge(builder -> {
- builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
- builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
- builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
- builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
- });
- }
- }
-
@Override
public double getDiskStoreUsage() {
//this should not happen but if it does, return -1 to highlight it is not working
@@ -3101,13 +3083,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return FileStoreMonitor.calculateUsage(getPagingManager().getDiskUsableSpace(), getPagingManager().getDiskTotalSpace());
}
- private void unregisterMeters() {
- MetricsManager metricsManager = this.metricsManager; // volatile load
- if (metricsManager != null) {
- metricsManager.remove(ResourceNames.BROKER + "." + configuration.getName());
- }
- }
-
private void deploySecurityFromConfiguration() {
for (Map.Entry<String, Set<Role>> entry : configuration.getSecurityRoles().entrySet()) {
securityRepository.addMatch(entry.getKey(), entry.getValue(), true);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index d2e7d88..040e4a3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -52,8 +52,6 @@ import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
-import org.apache.activemq.artemis.api.core.management.QueueControl;
-import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.PriorityAware;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
@@ -63,8 +61,8 @@ import org.apache.activemq.artemis.core.paging.cursor.PageIterator;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
-import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
+import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings;
@@ -90,8 +88,6 @@ import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.Redistributor;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
-import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
-import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.HierarchicalRepositoryChangeListener;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -689,7 +685,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
this.factory = factory;
- registerMeters();
if (this.addressInfo != null && this.addressInfo.isPaused()) {
this.pause(false);
}
@@ -2264,8 +2259,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
slowConsumerReaperFuture.cancel(false);
}
- unregisterMeters();
-
tx.commit();
} catch (Exception e) {
tx.rollback();
@@ -4346,43 +4339,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
- private void registerMeters() {
- if (server != null && server.getMetricsManager() != null) {
- String addressName = address.toString();
- String queueName = name.toString();
- MetricsManager metricsManager = server.getMetricsManager();
-
- metricsManager.registerQueueGauge(addressName, queueName, (builder) -> {
- builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
- builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
-
- builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
- builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
- builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
- });
- }
- }
-
- private void unregisterMeters() {
- if (server != null && server.getMetricsManager() != null) {
- server.getMetricsManager().remove(ResourceNames.QUEUE + name);
- }
- }
-
private class AddressSettingsRepositoryListener implements HierarchicalRepositoryChangeListener {
@Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
index 207a2da..6e86677 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/management/impl/ManagementServiceImpl.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.management.AcceptorControl;
+import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
@@ -54,6 +55,7 @@ import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@@ -95,7 +97,9 @@ import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.core.server.metrics.AddressMetricNames;
+import org.apache.activemq.artemis.core.server.metrics.BrokerMetricNames;
import org.apache.activemq.artemis.core.server.metrics.MetricsManager;
+import org.apache.activemq.artemis.core.server.metrics.QueueMetricNames;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.ResourceManager;
@@ -215,15 +219,30 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
registerInJMX(objectName, messagingServerControl);
registerInRegistry(ResourceNames.BROKER, messagingServerControl);
+ registerBrokerMeters();
return messagingServerControl;
}
+ private void registerBrokerMeters() {
+ MetricsManager metricsManager = messagingServer.getMetricsManager();
+ if (metricsManager != null) {
+ metricsManager.registerBrokerGauge(builder -> {
+ builder.register(BrokerMetricNames.CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getConnectionCount()), ActiveMQServerControl.CONNECTION_COUNT_DESCRIPTION);
+ builder.register(BrokerMetricNames.TOTAL_CONNECTION_COUNT, this, metrics -> Double.valueOf(messagingServer.getTotalConnectionCount()), ActiveMQServerControl.TOTAL_CONNECTION_COUNT_DESCRIPTION);
+ builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_DESCRIPTION);
+ builder.register(BrokerMetricNames.ADDRESS_MEMORY_USAGE_PERCENTAGE, this, metrics -> Double.valueOf(messagingServerControl.getAddressMemoryUsagePercentage()), ActiveMQServerControl.ADDRESS_MEMORY_USAGE_PERCENTAGE_DESCRIPTION);
+ builder.register(BrokerMetricNames.DISK_STORE_USAGE, this, metrics -> Double.valueOf(messagingServer.getDiskStoreUsage()), ActiveMQServerControl.DISK_STORE_USAGE_DESCRIPTION);
+ });
+ }
+ }
+
@Override
public synchronized void unregisterServer() throws Exception {
ObjectName objectName = objectNameBuilder.getActiveMQServerObjectName();
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.BROKER);
+ unregisterMeters(ResourceNames.BROKER + "." + messagingServer.getConfiguration().getName());
}
@Override
@@ -260,7 +279,7 @@ public class ManagementServiceImpl implements ManagementService {
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.ADDRESS + address);
- unregisterAddressMeters(address.toString());
+ unregisterMeters(ResourceNames.ADDRESS + address);
}
public synchronized void registerQueue(final Queue queue,
@@ -283,11 +302,13 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getQueueObjectName(addressInfo.getName(), queue.getName(), queue.getRoutingType());
registerInJMX(objectName, queueControl);
registerInRegistry(ResourceNames.QUEUE + queue.getName(), queueControl);
+ registerQueueMeters(queue);
if (logger.isDebugEnabled()) {
logger.debug("registered queue " + objectName);
}
}
+
@Override
public synchronized void registerQueue(final Queue queue,
final SimpleString address,
@@ -300,11 +321,47 @@ public class ManagementServiceImpl implements ManagementService {
ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name, routingType);
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.QUEUE + name);
+ unregisterMeters(ResourceNames.QUEUE + name);
if (messageCounterManager != null) {
messageCounterManager.unregisterMessageCounter(name.toString());
}
}
+ private void registerQueueMeters(final Queue queue) {
+ MetricsManager metricsManager = messagingServer.getMetricsManager();
+ if (metricsManager != null) {
+ metricsManager.registerQueueGauge(queue.getAddress().toString(), queue.getName().toString(), (builder) -> {
+ builder.register(QueueMetricNames.MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getMessageCount()), QueueControl.MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableMessageCount()), QueueControl.DURABLE_MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getPersistentSize()), QueueControl.PERSISTENT_SIZE_DESCRIPTION);
+ builder.register(QueueMetricNames.DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurablePersistentSize()), QueueControl.DURABLE_PERSISTENT_SIZE_DESCRIPTION);
+
+ builder.register(QueueMetricNames.DELIVERING_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDeliveringCount()), QueueControl.DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.DELIVERING_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableDeliveringCount()), QueueControl.DURABLE_DELIVERING_MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.DELIVERING_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDeliveringSize()), QueueControl.DELIVERING_SIZE_DESCRIPTION);
+ builder.register(QueueMetricNames.DELIVERING_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableDeliveringSize()), QueueControl.DURABLE_DELIVERING_SIZE_DESCRIPTION);
+
+ builder.register(QueueMetricNames.SCHEDULED_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getScheduledCount()), QueueControl.SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.SCHEDULED_DURABLE_MESSAGE_COUNT, this, metrics -> Double.valueOf(queue.getDurableScheduledCount()), QueueControl.DURABLE_SCHEDULED_MESSAGE_COUNT_DESCRIPTION);
+ builder.register(QueueMetricNames.SCHEDULED_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getScheduledSize()), QueueControl.SCHEDULED_SIZE_DESCRIPTION);
+ builder.register(QueueMetricNames.SCHEDULED_DURABLE_PERSISTENT_SIZE, this, metrics -> Double.valueOf(queue.getDurableScheduledSize()), QueueControl.DURABLE_SCHEDULED_SIZE_DESCRIPTION);
+
+ builder.register(QueueMetricNames.MESSAGES_ACKNOWLEDGED, this, metrics -> Double.valueOf(queue.getMessagesAcknowledged()), QueueControl.MESSAGES_ACKNOWLEDGED_DESCRIPTION);
+ builder.register(QueueMetricNames.MESSAGES_ADDED, this, metrics -> Double.valueOf(queue.getMessagesAdded()), QueueControl.MESSAGES_ADDED_DESCRIPTION);
+ builder.register(QueueMetricNames.MESSAGES_KILLED, this, metrics -> Double.valueOf(queue.getMessagesKilled()), QueueControl.MESSAGES_KILLED_DESCRIPTION);
+ builder.register(QueueMetricNames.MESSAGES_EXPIRED, this, metrics -> Double.valueOf(queue.getMessagesExpired()), QueueControl.MESSAGES_EXPIRED_DESCRIPTION);
+ builder.register(QueueMetricNames.CONSUMER_COUNT, this, metrics -> Double.valueOf(queue.getConsumerCount()), QueueControl.CONSUMER_COUNT_DESCRIPTION);
+ });
+ }
+ }
+
+ private void unregisterMeters(final String name) {
+ MetricsManager metricsManager = messagingServer.getMetricsManager();
+ if (metricsManager != null) {
+ metricsManager.remove(name);
+ }
+ }
+
@Override
public synchronized void registerDivert(final Divert divert) throws Exception {
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName().toString(), divert.getAddress().toString());
@@ -560,13 +617,6 @@ public class ManagementServiceImpl implements ManagementService {
}
}
- public void unregisterAddressMeters(String address) {
- MetricsManager metricsManager = messagingServer.getMetricsManager();
- if (metricsManager != null) {
- metricsManager.remove(ResourceNames.ADDRESS + address);
- }
- }
-
@Override
public void addNotificationListener(final NotificationListener listener) {
listeners.add(listener);
@@ -622,6 +672,7 @@ public class ManagementServiceImpl implements ManagementService {
for (String resourceName : resourceNames) {
unregisterFromRegistry(resourceName);
+ unregisterMeters(resourceName);
}
if (jmxManagementEnabled) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
index fb177bc..8fab39c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/metrics/MetricsManager.java
@@ -33,12 +33,14 @@ import io.micrometer.core.instrument.binder.jvm.JvmMemoryMetrics;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.jboss.logging.Logger;
public class MetricsManager {
+ private static final Logger log = Logger.getLogger(MetricsManager.class);
+
private final String brokerName;
private final MeterRegistry meterRegistry;
@@ -91,14 +93,7 @@ public class MetricsManager {
newMeters.add(meter);
});
final String resource = ResourceNames.QUEUE + queue;
- this.meters.compute(resource, (s, meters) -> {
- //the old meters are ignored on purpose
- meters = new ArrayList<>(newMeters.size());
- for (Gauge.Builder gauge : newMeters) {
- meters.add(gauge.register(meterRegistry));
- }
- return meters;
- });
+ registerMeter(newMeters, resource);
}
public void registerAddressGauge(String address, Consumer<MetricGaugeBuilder> builder) {
@@ -116,14 +111,7 @@ public class MetricsManager {
newMeters.add(meter);
});
final String resource = ResourceNames.ADDRESS + address;
- this.meters.compute(resource, (s, meters) -> {
- //the old meters are ignored on purpose
- meters = new ArrayList<>(newMeters.size());
- for (Gauge.Builder gauge : newMeters) {
- meters.add(gauge.register(meterRegistry));
- }
- return meters;
- });
+ registerMeter(newMeters, resource);
}
public void registerBrokerGauge(Consumer<MetricGaugeBuilder> builder) {
@@ -140,11 +128,19 @@ public class MetricsManager {
newMeters.add(meter);
});
final String resource = ResourceNames.BROKER + "." + brokerName;
+ registerMeter(newMeters, resource);
+ }
+
+ private void registerMeter(List<Gauge.Builder> newMeters, String resource) {
this.meters.compute(resource, (s, meters) -> {
//the old meters are ignored on purpose
meters = new ArrayList<>(newMeters.size());
- for (Gauge.Builder gauge : newMeters) {
- meters.add(gauge.register(meterRegistry));
+ for (Gauge.Builder gaugeBuilder : newMeters) {
+ Gauge gauge = gaugeBuilder.register(meterRegistry);
+ meters.add(gauge);
+ if (log.isDebugEnabled()) {
+ log.debug("Registered meter: " + gauge.getId());
+ }
}
return meters;
});
@@ -157,8 +153,8 @@ public class MetricsManager {
}
for (Meter meter : meters) {
Meter removed = meterRegistry.remove(meter);
- if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
- ActiveMQServerLogger.LOGGER.debug("Removed meter: " + removed.getId());
+ if (log.isDebugEnabled()) {
+ log.debug("Unregistered meter: " + removed.getId());
}
}
return null;
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java
new file mode 100755
index 0000000..43c904b
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/SharedStoreMetricsLeakTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.core.config.MetricsConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
+import org.apache.activemq.artemis.core.server.metrics.plugins.SimpleMetricsPlugin;
+import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+
+public class SharedStoreMetricsLeakTest extends ClusterTestBase {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ setupServers();
+ }
+
+ private void setupServers() throws Exception {
+ setupLiveServer(0, isFileStorage(), true, isNetty(), false);
+ setupBackupServer(1, 0, isFileStorage(), true, isNetty());
+
+ getServer(0).getConfiguration().setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true));
+ getServer(0).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
+ getServer(1).getConfiguration().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration().setFailoverOnServerShutdown(true).setAllowFailBack(true));
+ getServer(1).getConfiguration().setMetricsConfiguration(new MetricsConfiguration().setJvmThread(false).setJvmGc(false).setJvmMemory(false).setPlugin(new SimpleMetricsPlugin().init(null)));
+
+ // configure cluster for bother servers
+ setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 0, 1);
+ setupClusterConnection("cluster", "queues", MessageLoadBalancingType.ON_DEMAND, 1, isNetty(), 1, 0);
+ }
+
+ private boolean isNetty() {
+ return true;
+ }
+
+ @Test
+ public void testForMeterLeaks() throws Exception {
+ ActiveMQServer live = getServer(0);
+ ActiveMQServer backup = getServer(1);
+
+ live.start();
+ assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
+
+ backup.start();
+ assertFalse(backup.waitForActivation(1, TimeUnit.SECONDS));
+
+ // there should be a handful of metrics available from the ActiveMQServerImpl itself
+ long baseline = backup.getMetricsManager().getMeterRegistry().getMeters().size();
+
+ live.stop();
+ assertTrue(backup.waitForActivation(5, TimeUnit.SECONDS));
+
+ // after failover more meters should get registered
+ Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() > baseline, 2000, 100);
+
+ live.start();
+ assertTrue(live.waitForActivation(5, TimeUnit.SECONDS));
+
+ // after failback the number of registered meters should return to baseline
+ Wait.assertTrue(() -> backup.getMetricsManager().getMeterRegistry().getMeters().size() == baseline, 2000, 100);
+
+ live.stop();
+ backup.stop();
+ }
+}