You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/08/01 18:49:51 UTC
[geode] branch feature/CleanMicrometer updated: WIP: cleaned up
more imports and added CCUStats.kt, AsyncEventQueueStats.kt
This is an automated email from the ASF dual-hosted git repository.
udo pushed a commit to branch feature/CleanMicrometer
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/CleanMicrometer by this push:
new ee0c87d WIP: cleaned up more imports and added CCUStats.kt, AsyncEventQueueStats.kt
ee0c87d is described below
commit ee0c87d107ce34804946dc1c4d86e44547247f66
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Wed Aug 1 11:49:36 2018 -0700
WIP: cleaned up more imports and added CCUStats.kt, AsyncEventQueueStats.kt
---
.../asyncqueue/internal/AsyncEventQueueImpl.java | 1 +
.../cache/client/internal/ConnectionImpl.java | 1 +
.../cache/client/internal/GetEventValueOp.java | 1 +
.../cache/query/internal/SortedResultsBag.java | 1 +
.../geode/cache/server/internal/LoadMonitor.java | 16 +-
.../internal/membership/gms/Services.java | 2 +-
.../distributed/internal/tcpserver/TcpServer.java | 8 +-
.../geode/internal/admin/StatAlertDefinition.java | 3 -
.../internal/cache/BucketPersistenceAdvisor.java | 1 +
.../org/apache/geode/internal/cache/HARegion.java | 1 +
.../internal/cache/control/HeapMemoryMonitor.java | 3 -
.../internal/cache/tier/sockets/BaseCommand.java | 4 +-
.../cache/tier/sockets/CacheClientUpdater.java | 11 +-
.../internal/cache/tier/sockets/command/Get70.java | 9 +-
.../monitoring/ThreadsMonitoringProcess.java | 2 +-
.../internal/beans/MemberMBeanBridge.java | 1 -
.../org/apache/geode/statistics/cache/CCUStats.kt | 53 ++++++
.../geode/statistics/cache/CacheServerStats.kt | 10 +-
.../geode/statistics/wan/AsyncEventQueueStats.java | 186 ---------------------
.../geode/statistics/wan/AsyncEventQueueStats.kt | 21 +++
.../geode/statistics/wan/GatewaySenderStats.kt | 73 ++++----
21 files changed, 143 insertions(+), 265 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index d011e28..69c5ad4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -30,6 +30,7 @@ import org.apache.geode.internal.cache.RegionQueue;
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import org.apache.geode.statistics.wan.AsyncEventQueueStats;
public class AsyncEventQueueImpl implements AsyncEventQueue {
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index 26de256..12a52e8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -44,6 +44,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.statistics.client.connection.ConnectionStats;
/**
* A single client to server connection.
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
index a3f7494..e2bc3e9 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
@@ -20,6 +20,7 @@ import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.statistics.client.connection.ConnectionStats;
/**
* Gets (full) value (unlike GetOp, which may get either a full value or a delta depending upon
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
index af50e8d..2195e26 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
@@ -25,6 +25,7 @@ import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.cache.query.types.ObjectType;
import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.statistics.cache.CachePerfStats;
/**
* This results set is used to sort the data allowing duplicates. If the data being added is already
diff --git a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
index f0b50e8..a7dd6c8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
@@ -28,18 +28,17 @@ import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.MembershipManager;
import org.apache.geode.internal.cache.CacheServerAdvisor;
import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.statistics.cache.CacheServerStats;
/**
- * A class which monitors the load on a bridge server and periodically sends updates to the locator.
- *
+ * A class which monitors the load on a bridge server and periodically sends updates to the
+ * locator.
* @since GemFire 5.7
- *
*/
public class LoadMonitor implements ConnectionListener {
private static final Logger logger = LogService.getLogger();
@@ -53,7 +52,7 @@ public class LoadMonitor implements ConnectionListener {
protected CacheServerStats stats;
public LoadMonitor(ServerLoadProbe probe, int maxConnections, long pollInterval,
- int forceUpdateFrequency, CacheServerAdvisor advisor) {
+ int forceUpdateFrequency, CacheServerAdvisor advisor) {
this.probe = probe;
this.metrics = new ServerMetricsImpl(maxConnections);
this.pollingThread = new PollingThread(pollInterval, forceUpdateFrequency);
@@ -70,7 +69,8 @@ public class LoadMonitor implements ConnectionListener {
this.location = location;
this.pollingThread.start();
this.stats = cacheServerStats;
- this.stats.setLoad(lastLoad);
+ this.stats.setLoad(lastLoad.getConnectionLoad(), lastLoad.getLoadPerConnection(),
+ lastLoad.getSubscriptionConnectionLoad(), lastLoad.getLoadPerSubscriptionConnection());
}
/**
@@ -113,7 +113,6 @@ public class LoadMonitor implements ConnectionListener {
/**
* Keeps track of the clients that have added a queue since the last load was sent to the
* server-locator.
- *
* @since GemFire 5.7
*/
protected final ArrayList clientIds = new ArrayList();
@@ -199,7 +198,8 @@ public class LoadMonitor implements ConnectionListener {
locators);
}
- stats.setLoad(load);
+ stats.setLoad(load.getConnectionLoad(), load.getLoadPerConnection(),
+ load.getSubscriptionConnectionLoad(), load.getLoadPerSubscriptionConnection());
if (locators != null) {
CacheServerLoadMessage message =
new CacheServerLoadMessage(load, location, myClientIds);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
index eb73fe6..b37e810 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
@@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.membership.DistributedMembershipListener;
@@ -47,6 +46,7 @@ import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.security.SecurityServiceFactory;
import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.statistics.distributed.DMStats;
@SuppressWarnings("ConstantConditions")
public class Services {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index f9278e0..8a1d858 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -46,7 +46,6 @@ import org.apache.geode.cache.IncompatibleVersionException;
import org.apache.geode.cache.UnsupportedVersionException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionConfigImpl;
-import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.InternalLocator;
@@ -356,7 +355,7 @@ public class TcpServer {
*/
private void processRequest(final Socket socket) {
executor.execute(() -> {
- long startTime = DistributionStats.getStatTime();
+ long startTime = System.nanoTime();
DataInputStream input = null;
try {
socket.setSoTimeout(READ_TIMEOUT);
@@ -486,7 +485,7 @@ public class TcpServer {
handler.endRequest(request, startTime);
- startTime = DistributionStats.getStatTime();
+ startTime = System.nanoTime();
if (response != null) {
DataOutputStream output = new DataOutputStream(socket.getOutputStream());
if (versionOrdinal != Version.CURRENT_ORDINAL) {
@@ -523,8 +522,7 @@ public class TcpServer {
try {
ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
- clientProtocolService.initializeStatistics("LocatorStats",
- internalLocator.getDistributedSystem().getStatisticsFactory());
+ clientProtocolService.initializeStatistics("LocatorStats");
try (ClientProtocolProcessor pipeline = clientProtocolService.createProcessorForLocator(
internalLocator, internalLocator.getCache().getSecurityService())) {
while (!pipeline.socketProcessingIsFinished()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
index 34540a2..d7e432b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.admin;
import org.apache.geode.DataSerializable;
-import org.apache.geode.statistics.StatisticsFactory;
import org.apache.geode.internal.admin.statalerts.StatisticInfo;
/**
@@ -74,8 +73,6 @@ public interface StatAlertDefinition extends DataSerializable {
*/
Number[] getValue(Number[] vals);
- boolean verify(StatisticsFactory factory);
-
boolean hasDecorator(String decoratorID);
StatAlertDefinition getDecorator(String decoratorID);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
index e6f9ded..22b657b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
@@ -42,6 +42,7 @@ import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.util.TransformUtils;
+import org.apache.geode.statistics.disk.DiskRegionStats;
public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index cafc43f..16e8ffe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.statistics.cache.CachePerfStats;
/**
* This region is being implemented to suppress distribution of puts and to allow localDestroys on
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index a45b846..737937b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -47,9 +47,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.LoggingThreadGroup;
import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.statistics.GemFireStatSampler;
-import org.apache.geode.internal.statistics.LocalStatListener;
-import org.apache.geode.internal.statistics.StatisticsImpl;
import org.apache.geode.statistics.resourcemanger.ResourceManagerStats;
/**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 82cd631..007c9b6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -48,7 +48,6 @@ import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.cache.query.types.CollectionType;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.Version;
@@ -81,6 +80,7 @@ import org.apache.geode.internal.offheap.OffHeapHelper;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.internal.sequencelog.EntryLogger;
import org.apache.geode.security.GemFireSecurityException;
+import org.apache.geode.statistics.cache.CacheServerStats;
public abstract class BaseCommand implements Command {
protected static final Logger logger = LogService.getLogger();
@@ -141,7 +141,7 @@ public abstract class BaseCommand implements Command {
public void execute(Message clientMessage, ServerConnection serverConnection,
SecurityService securityService) {
// Read the request and update the statistics
- long start = DistributionStats.getStatTime();
+ long start = System.nanoTime();
if (EntryLogger.isEnabled() && serverConnection != null) {
EntryLogger.setSource(serverConnection.getMembershipID(), "c2s");
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index e84ba27..9c6da5b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -37,11 +37,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.InvalidDeltaException;
-import org.apache.geode.statistics.StatisticDescriptor;
-import org.apache.geode.statistics.Statistics;
-import org.apache.geode.statistics.StatisticsFactory;
-import org.apache.geode.statistics.StatisticsType;
-import org.apache.geode.statistics.StatisticsTypeFactory;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.InterestResultPolicy;
import org.apache.geode.cache.Operation;
@@ -88,10 +83,10 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.sequencelog.EntryLogger;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
import org.apache.geode.security.AuthenticationFailedException;
import org.apache.geode.security.AuthenticationRequiredException;
import org.apache.geode.security.GemFireSecurityException;
+import org.apache.geode.statistics.cache.CCUStats;
/**
* {@code CacheClientUpdater} is a thread that processes update messages from a cache server and
@@ -288,7 +283,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// this holds the connection which this threads reads
this.eManager = eManager;
this.endpoint = endpoint;
- this.stats = new CCUStats(this.system.getStatisticsFactory(), this.location);
+ this.stats = new CCUStats(this.location.toString());
// Create the connection...
final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -554,8 +549,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
// ignore
}
- this.stats.close();
-
// close the helper
if (this.cacheHelper != null) {
this.cacheHelper.close();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
index e3da197..0497259 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
@@ -20,7 +20,6 @@ import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.internal.GetOp;
import org.apache.geode.cache.operations.GetOperationContext;
import org.apache.geode.cache.operations.internal.GetOperationContextImpl;
-import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.i18n.StringId;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.LocalRegion;
@@ -31,7 +30,6 @@ import org.apache.geode.internal.cache.tier.CachedRegionHelper;
import org.apache.geode.internal.cache.tier.Command;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
-import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
@@ -47,6 +45,7 @@ import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.NotAuthorizedException;
import org.apache.geode.security.ResourcePermission.Operation;
import org.apache.geode.security.ResourcePermission.Resource;
+import org.apache.geode.statistics.cache.CacheServerStats;
public class Get70 extends BaseCommand {
@@ -71,7 +70,7 @@ public class Get70 extends BaseCommand {
// requiresResponse = true;
{
long oldStart = start;
- start = DistributionStats.getStatTime();
+ start = System.nanoTime();
stats.incReadGetRequestTime(start - oldStart);
}
// Retrieve the data from the message parts
@@ -194,7 +193,7 @@ public class Get70 extends BaseCommand {
data = securityService.postProcess(regionName, key, data, entry.isObject);
long oldStart = start;
- start = DistributionStats.getStatTime();
+ start = System.nanoTime();
stats.incProcessGetTime(start - oldStart);
if (region instanceof PartitionedRegion) {
@@ -220,7 +219,7 @@ public class Get70 extends BaseCommand {
logger.debug("{}: Wrote get response back to {} for region {} {}", serverConnection.getName(),
serverConnection.getSocketString(), regionName, entry);
}
- stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start);
+ stats.incWriteGetResponseTime(System.nanoTime() - start);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
index 166346f..652253b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
@@ -26,9 +26,9 @@ import org.apache.geode.distributed.internal.DistributionConfigImpl;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.ResourceManagerStats;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.statistics.resourcemanger.ResourceManagerStats;
public class ThreadsMonitoringProcess extends TimerTask {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
index 7dffd7b..d5aef60 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
@@ -42,7 +42,6 @@ import javax.management.ObjectName;
import org.apache.logging.log4j.Logger;
import org.apache.geode.internal.statistics.InternalDistributedSystemStats;
-import org.apache.geode.statistics.Statistics;
import org.apache.geode.statistics.StatisticsType;
import org.apache.geode.cache.DiskStore;
import org.apache.geode.cache.Region;
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt
new file mode 100644
index 0000000..8d27e57
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt
@@ -0,0 +1,53 @@
+package org.apache.geode.statistics.cache
+
+import org.apache.geode.internal.cache.tier.sockets.MessageStats
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+
+/**
+ * Stats for a CacheClientUpdater. Currently the only thing measured are incoming bytes on the
+ * wire
+ *
+ * @since GemFire 5.7
+ */
+class CCUStats internal constructor(private val serverLocation: String) : MicrometerMeterGroup("CacheClientUpdaterStats-$serverLocation"), MessageStats {
+
+ override fun getCommonTags(): Array<String> = arrayOf("serverLocation", serverLocation)
+
+ private val clientUpdaterReceivedBytesMeter = CounterStatisticMeter("client.updater.received.bytes.count", "Total number of bytes received from the server.", unit = "bytes")
+ private val clientUpdateMessagesReceivedMeter = GaugeStatisticMeter("client.updater.received.messages.count", "Current number of message being received off the network or being processed after reception.")
+ private val clientUpdateMessagesReceivedBytesMeter = GaugeStatisticMeter("client.updater.received.messages.bytes", "Current number of bytes consumed by messages being received or processed.", unit = "bytes")
+
+ override fun initializeStaticMeters() {
+ registerMeter(clientUpdaterReceivedBytesMeter)
+ registerMeter(clientUpdateMessagesReceivedMeter)
+ registerMeter(clientUpdateMessagesReceivedBytesMeter)
+ }
+
+ override fun incReceivedBytes(bytes: Long) {
+ clientUpdaterReceivedBytesMeter.increment(bytes)
+ }
+
+ override fun incSentBytes(v: Long) {
+ // noop since we never send messages
+ }
+
+ override fun incMessagesBytesBeingReceived(bytes: Int) {
+ clientUpdateMessagesReceivedBytesMeter.increment(bytes)
+ }
+
+ override fun decMessagesBytesBeingReceived(bytes: Int) {
+ clientUpdateMessagesReceivedBytesMeter.decrement(bytes)
+ }
+
+ fun incMessagesBeingReceived(bytes: Int) {
+ clientUpdateMessagesReceivedMeter.increment()
+ incMessagesBytesBeingReceived(bytes)
+ }
+
+ fun decMessagesBeingReceived(bytes: Int) {
+ clientUpdateMessagesReceivedMeter.decrement()
+ decMessagesBytesBeingReceived(bytes)
+ }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
index fe20d90..1851e52 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
@@ -605,11 +605,11 @@ open class CacheServerStats @JvmOverloads constructor(private val ownerName: Str
cacheServerClientReadyResponseWrittenMeter.increment()
}
- fun setLoad(connectionLoad: Int, loadPerConnection: Int, queueLoad: Int, loadPerQueue: Int) {
- cacheServerConnectionLoadMeter.increment(connectionLoad)
- cacheServerLoadPerConnectionMeter.increment(loadPerConnection)
- cacheServerQueueLoadMeter.increment(queueLoad)
- cacheServerLoadPerQueueMeter.increment(loadPerQueue)
+ fun setLoad(connectionLoad: Float, loadPerConnection: Float, queueLoad: Float, loadPerQueue: Float) {
+ cacheServerConnectionLoadMeter.increment(connectionLoad.toDouble())
+ cacheServerLoadPerConnectionMeter.increment(loadPerConnection.toDouble())
+ cacheServerQueueLoadMeter.increment(queueLoad.toDouble())
+ cacheServerLoadPerQueueMeter.increment(loadPerQueue.toDouble())
}
val cnxPoolHelper: PoolStatHelper
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java
deleted file mode 100644
index 746dd1b..0000000
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.statistics.wan;
-
-import org.apache.geode.statistics.StatisticDescriptor;
-import org.apache.geode.statistics.StatisticsFactory;
-import org.apache.geode.statistics.StatisticsType;
-import org.apache.geode.statistics.StatisticsTypeFactory;
-import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
-
-public class AsyncEventQueueStats extends GatewaySenderStats {
-
- public static final String typeName = "AsyncEventQueueStatistics";
-
- /**
- * The <code>StatisticsType</code> of the statistics
- */
- private StatisticsType type;
-
- @Override
- protected void initializeStats(StatisticsFactory factory) {
- type = factory.createType(typeName, "Stats for activity in the AsyncEventQueue",
- new StatisticDescriptor[]{
- factory.createIntCounter(GatewaySenderStats.EVENTS_RECEIVED, "Number of events received by this queue.",
- "operations"),
- factory.createIntCounter(GatewaySenderStats.EVENTS_QUEUED, "Number of events added to the event queue.",
- "operations"),
- factory.createLongCounter(GatewaySenderStats.EVENT_QUEUE_TIME, "Total time spent queueing events.",
- "nanoseconds"),
- factory.createIntGauge(GatewaySenderStats.EVENT_QUEUE_SIZE, "Size of the event queue.", "operations",
- false),
- factory.createIntGauge(GatewaySenderStats.SECONDARY_EVENT_QUEUE_SIZE, "Size of the secondary event queue.",
- "operations", false),
- factory.createIntGauge(GatewaySenderStats.EVENTS_PROCESSED_BY_PQRM,
- "Total number of events processed by Parallel Queue Removal Message(PQRM).",
- "operations", false),
- factory.createIntGauge(GatewaySenderStats.TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
- "operations", false),
- factory.createIntCounter(GatewaySenderStats.EVENTS_NOT_QUEUED_CONFLATED,
- "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
- "operations"),
- factory.createIntCounter(GatewaySenderStats.EVENTS_CONFLATED_FROM_BATCHES,
- "Number of events conflated from batches.", "operations"),
- factory.createIntCounter(GatewaySenderStats.EVENTS_DISTRIBUTED,
- "Number of events removed from the event queue and sent.", "operations"),
- factory.createIntCounter(GatewaySenderStats.EVENTS_EXCEEDING_ALERT_THRESHOLD,
- "Number of events exceeding the alert threshold.", "operations", false),
- factory.createLongCounter(GatewaySenderStats.BATCH_DISTRIBUTION_TIME,
- "Total time spent distributing batches of events to receivers.", "nanoseconds"),
- factory.createIntCounter(GatewaySenderStats.BATCHES_DISTRIBUTED,
- "Number of batches of events removed from the event queue and sent.", "operations"),
- factory.createIntCounter(GatewaySenderStats.BATCHES_REDISTRIBUTED,
- "Number of batches of events removed from the event queue and resent.",
- "operations", false),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_ADDED_BY_PRIMARY,
- "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).",
- "tokens"),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_ADDED_BY_SECONDARY,
- "Number of events added to the secondary's unprocessed event map by the secondary.",
- "events"),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY,
- "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).",
- "events"),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY,
- "Number of tokens removed from the secondary's unprocessed token map by the secondary.",
- "tokens"),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT,
- "Number of events removed from the secondary's unprocessed event map by a timeout.",
- "events"),
- factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT,
- "Number of tokens removed from the secondary's unprocessed token map by a timeout.",
- "tokens"),
- factory.createIntGauge(GatewaySenderStats.UNPROCESSED_EVENT_MAP_SIZE,
- "Current number of entries in the secondary's unprocessed event map.", "events",
- false),
- factory.createIntGauge(GatewaySenderStats.UNPROCESSED_TOKEN_MAP_SIZE,
- "Current number of entries in the secondary's unprocessed token map.", "tokens",
- false),
- factory.createIntGauge(GatewaySenderStats.CONFLATION_INDEXES_MAP_SIZE,
- "Current number of entries in the conflation indexes map.", "events"),
- factory.createIntCounter(GatewaySenderStats.NOT_QUEUED_EVENTS, "Number of events not added to queue.",
- "events"),
- factory.createIntCounter(
- GatewaySenderStats.EVENTS_DROPPED_DUE_TO_PRIMARY_SENDER_NOT_RUNNING,
- "Number of events dropped because the primary gateway sender is not running.",
- "events"),
- factory.createIntCounter(GatewaySenderStats.EVENTS_FILTERED,
- "Number of events filtered through GatewayEventFilter.", "events"),
- factory.createIntCounter(GatewaySenderStats.LOAD_BALANCES_COMPLETED, "Number of load balances completed",
- "operations"),
- factory.createIntGauge(GatewaySenderStats.LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
- "operations"),
- factory.createLongCounter(GatewaySenderStats.LOAD_BALANCE_TIME,
- "Total time spent load balancing this sender",
- "nanoseconds"),
- factory.createIntCounter(GatewaySenderStats.SYNCHRONIZATION_EVENTS_ENQUEUED,
- "Number of synchronization events added to the event queue.", "operations"),
- factory.createIntCounter(GatewaySenderStats.SYNCHRONIZATION_EVENTS_PROVIDED,
- "Number of synchronization events provided to other members.", "operations"),});
-
- // Initialize id fields
- GatewaySenderStats.eventsReceivedId = type.nameToId(GatewaySenderStats.EVENTS_RECEIVED);
- GatewaySenderStats.eventsQueuedId = type.nameToId(GatewaySenderStats.EVENTS_QUEUED);
- GatewaySenderStats.eventsNotQueuedConflatedId = type.nameToId(
- GatewaySenderStats.EVENTS_NOT_QUEUED_CONFLATED);
- GatewaySenderStats.eventQueueTimeId = type.nameToId(GatewaySenderStats.EVENT_QUEUE_TIME);
- GatewaySenderStats.eventQueueSizeId = type.nameToId(GatewaySenderStats.EVENT_QUEUE_SIZE);
- GatewaySenderStats.secondaryEventQueueSizeId = type.nameToId(
- GatewaySenderStats.SECONDARY_EVENT_QUEUE_SIZE);
- GatewaySenderStats.eventsProcessedByPQRMId = type.nameToId(
- GatewaySenderStats.EVENTS_PROCESSED_BY_PQRM);
- GatewaySenderStats.eventTmpQueueSizeId = type.nameToId(GatewaySenderStats.TMP_EVENT_QUEUE_SIZE);
- GatewaySenderStats.eventsDistributedId = type.nameToId(GatewaySenderStats.EVENTS_DISTRIBUTED);
- GatewaySenderStats.eventsExceedingAlertThresholdId = type.nameToId(
- GatewaySenderStats.EVENTS_EXCEEDING_ALERT_THRESHOLD);
- GatewaySenderStats.batchDistributionTimeId = type.nameToId(
- GatewaySenderStats.BATCH_DISTRIBUTION_TIME);
- GatewaySenderStats.batchesDistributedId = type.nameToId(GatewaySenderStats.BATCHES_DISTRIBUTED);
- GatewaySenderStats.batchesRedistributedId = type.nameToId(
- GatewaySenderStats.BATCHES_REDISTRIBUTED);
- GatewaySenderStats.unprocessedTokensAddedByPrimaryId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
- GatewaySenderStats.unprocessedEventsAddedBySecondaryId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
- GatewaySenderStats.unprocessedEventsRemovedByPrimaryId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY);
- GatewaySenderStats.unprocessedTokensRemovedBySecondaryId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY);
- GatewaySenderStats.unprocessedEventsRemovedByTimeoutId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT);
- GatewaySenderStats.unprocessedTokensRemovedByTimeoutId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT);
- GatewaySenderStats.unprocessedEventMapSizeId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_EVENT_MAP_SIZE);
- GatewaySenderStats.unprocessedTokenMapSizeId = type.nameToId(
- GatewaySenderStats.UNPROCESSED_TOKEN_MAP_SIZE);
- GatewaySenderStats.conflationIndexesMapSizeId = type.nameToId(
- GatewaySenderStats.CONFLATION_INDEXES_MAP_SIZE);
- GatewaySenderStats.notQueuedEventsId = type.nameToId(GatewaySenderStats.NOT_QUEUED_EVENTS);
- GatewaySenderStats.eventsDroppedDueToPrimarySenderNotRunningId =
- type.nameToId(GatewaySenderStats.EVENTS_DROPPED_DUE_TO_PRIMARY_SENDER_NOT_RUNNING);
- GatewaySenderStats.eventsFilteredId = type.nameToId(GatewaySenderStats.EVENTS_FILTERED);
- GatewaySenderStats.eventsConflatedFromBatchesId = type.nameToId(
- GatewaySenderStats.EVENTS_CONFLATED_FROM_BATCHES);
- GatewaySenderStats.loadBalancesCompletedId = type.nameToId(
- GatewaySenderStats.LOAD_BALANCES_COMPLETED);
- GatewaySenderStats.loadBalancesInProgressId = type.nameToId(
- GatewaySenderStats.LOAD_BALANCES_IN_PROGRESS);
- GatewaySenderStats.loadBalanceTimeId = type.nameToId(GatewaySenderStats.LOAD_BALANCE_TIME);
- GatewaySenderStats.synchronizationEventsEnqueuedId = type.nameToId(
- GatewaySenderStats.SYNCHRONIZATION_EVENTS_ENQUEUED);
- GatewaySenderStats.synchronizationEventsProvidedId = type.nameToId(
- GatewaySenderStats.SYNCHRONIZATION_EVENTS_PROVIDED);
- }
-
- /**
- * Constructor.
- * @param factory The <code>StatisticsFactory</code> which creates the <code>Statistics</code>
- * instance
- * @param asyncQueueId The id of the <code>AsyncEventQueue</code> used to generate the name of
- * the
- * <code>Statistics</code>
- */
- public AsyncEventQueueStats(StatisticsFactory factory, String asyncQueueId) {
- super();
- initializeStats(factory);
- this.stats = factory.createAtomicStatistics(type, "asyncEventQueueStats-" + asyncQueueId);
- }
-
- public StatisticsType getType() {
- return type;
- }
-}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt
new file mode 100644
index 0000000..118e2bd
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.statistics.wan
+
+class AsyncEventQueueStats(private val asyncQueueName: String) : GatewaySenderStats(asyncQueueName, "AsyncQueueStats-$asyncQueueName") {
+ override fun getCommonTags(): Array<String> = arrayOf("asyncQueueName", asyncQueueName)
+ override val queueStatPrefix: String by lazy { "async" }
+
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
index 95c2907..06ebc29 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
@@ -20,42 +20,43 @@ import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
import org.apache.geode.statistics.util.NOW_NANOS
-class GatewaySenderStats(private val gateReceiverName: String) : MicrometerMeterGroup("GatewayReceiverStats-$gateReceiverName") {
-
- override fun getCommonTags(): Array<String> = arrayOf("gatewaySenderName", gateReceiverName)
-
- private val gatewaySenderEventsReceivedMeter = CounterStatisticMeter("gateway.sender.event.received.count", "Number of events received by this Sender.")
- private val gatewaySenderEventsQueueMeter = CounterStatisticMeter("gateway.sender.event.queued.count", "Number of events added to the event queue.")
- private val gatewaySenderEventsQueueTimer = TimerStatisticMeter("gateway.sender.event.queue.time", "Total time spent queueing events.", unit = "nanoseconds")
- private val gatewaySenderEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of the event queue.", arrayOf("queueType", "primary"))
- private val gatewaySenderSecondaryEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of secondary event queue.", arrayOf("queueType", "secondary"))
- private val gatewaySenderEventsProcessedByPQRMMeter = GaugeStatisticMeter("gateway.sender.event.processed.pqrm.count", "Total number of events processed by Parallel Queue Removal Message(PQRM).")
- private val gatewaySenderTempEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of the temporary events.", arrayOf("queueType", "temp"))
- private val gatewaySenderEventsNotQueuedConflatedMeter = CounterStatisticMeter("gateway.sender.event.notqueued.conflated.count", "Number of events received but not added to the event queue because the queue already contains an event with the event's key.")
- private val gatewaySenderEventsConflatedMeter = CounterStatisticMeter("gateway.sender.event.conflated.count", "Number of events conflated from batches.")
- private val gatewaySenderEventsDistributionMeter = CounterStatisticMeter("gateway.sender.event.distribution.count", "Number of events removed from the event queue and sent.")
- private val gatewaySenderEventExceededAlertThresholdMeter = CounterStatisticMeter("gateway.sender.event.alertthreshold.count", "Number of events exceeding the alert threshold.")
- private val gatewaySenderEventsDistributionTimer = TimerStatisticMeter("gateway.sender.event.distribution.time", "Total time spent distributing batches of events to other gateway receivers.", unit = "nanoseconds")
- private val gatewaySenderEventsBatchDistributedMeter = CounterStatisticMeter("gateway.sender.batches.distributed.count", "Number of batches of events removed from the event queue and sent.")
- private val gatewaySenderEventsBatchRedistributedMeter = CounterStatisticMeter("gateway.sender.batches.redistributed.count", "Number of batches of events removed from the event queue and resent.")
- private val gatewaySenderEventsBatchResizedMeter = CounterStatisticMeter("gateway.sender.batches.resized.count", "Number of batches that were resized because they were too large")
- private val gatewaySenderUnprocessedTokenAddedByPrimaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.added.count", "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).", arrayOf("actorType", "primary"))
- private val gatewaySenderUnprocessedEventAddedBySecondaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.added.count", "Number of events added to the secondary's unprocessed event map by the secondary.", arrayOf("actorType", "secondary"))
- private val gatewaySenderUnprocessedEventRemovedByPrimaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).", arrayOf("actorType", "primary"))
- private val gatewaySenderUnprocessedTokenRemovedBySecondaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary's unprocessed token map by the secondary.", arrayOf("actorType", "secondary"))
- private val gatewaySenderUnprocessedEventRemovedByTimeoutMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary's unprocessed event map by a timeout.", arrayOf("actorType", "timeout"))
- private val gatewaySenderUnprocessedTokenRemovedByTimeoutMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary's unprocessed token map by a timeout.", arrayOf("actorType", "timeout"))
- private val gatewaySenderUnprocessedEventSizeMeter = GaugeStatisticMeter("gateway.sender.unprocessed.event.secondary.size", "Current number of entries in the secondary's unprocessed event map.")
- private val gatewaySenderUnprocessedTokenSizeMeter = GaugeStatisticMeter("gateway.sender.unprocessed.token.secondary.size", "Current number of entries in the secondary's unprocessed token map.")
- private val gatewaySenderConflationIndexSizeMeter = GaugeStatisticMeter("gateway.sender.conflation.index.size", "Current number of entries in the conflation indexes map.")
- private val gatewaySenderEventsNotQueuedMeter = CounterStatisticMeter("gateway.sender.notQueued.count", "Number of events not added to queue.")
- private val gatewaySenderEventsDroppedSenderNotRunningMeter = CounterStatisticMeter("gateway.sender.events.dropped.count", "Number of events dropped because the primary gateway sender is not running.", arrayOf("reason", "primarySenderNotRunning"))
- private val gatewaySenderEventsDroppedFilteredMeter = CounterStatisticMeter("gateway.sender.events.filtered.count", "Number of events filtered through GatewayEventFilter.")
- private val gatewaySenderLoadBalancesCompletedMeter = CounterStatisticMeter("gateway.sender.loadBalances.completed.count", "Number of load balances completed")
- private val gatewaySenderLoadBalancesInProgressMeter = GaugeStatisticMeter("gateway.sender.loadBalances.inprogress.count", "Number of load balances in progress")
- private val gatewaySenderLoadBalancesTimer = TimerStatisticMeter("gateway.sender.loadBalances.time", "Total time spent load balancing this sender", unit = "nanoseconds")
- private val gatewaySenderSynchronizationEventsQueuedMeter = CounterStatisticMeter("gateway.sender.synchronization.events.queued.count", "Number of synchronization events added to the event queue.")
- private val gatewaySenderSynchronizationEventsSentMeter = CounterStatisticMeter("gateway.sender.synchronization.events.sent.count", "Number of synchronization events provided to other members.")
+open class GatewaySenderStats @JvmOverloads constructor(private val queueName: String, private val groupName: String = "GatewayReceiverStats-$queueName") : MicrometerMeterGroup(groupName) {
+
+ override fun getCommonTags(): Array<String> = arrayOf("gatewaySenderName", queueName)
+ open val queueStatPrefix: String by lazy { "gateway.sender" }
+
+ private val gatewaySenderEventsReceivedMeter = CounterStatisticMeter("$queueStatPrefix.event.received.count", "Number of events received by this Sender.")
+ private val gatewaySenderEventsQueueMeter = CounterStatisticMeter("$queueStatPrefix.event.queued.count", "Number of events added to the event queue.")
+ private val gatewaySenderEventsQueueTimer = TimerStatisticMeter("$queueStatPrefix.event.queue.time", "Total time spent queueing events.", unit = "nanoseconds")
+ private val gatewaySenderEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of the event queue.", arrayOf("queueType", "primary"))
+ private val gatewaySenderSecondaryEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of secondary event queue.", arrayOf("queueType", "secondary"))
+ private val gatewaySenderEventsProcessedByPQRMMeter = GaugeStatisticMeter("$queueStatPrefix.event.processed.pqrm.count", "Total number of events processed by Parallel Queue Removal Message(PQRM).")
+ private val gatewaySenderTempEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of the temporary events.", arrayOf("queueType", "temp"))
+ private val gatewaySenderEventsNotQueuedConflatedMeter = CounterStatisticMeter("$queueStatPrefix.event.notqueued.conflated.count", "Number of events received but not added to the event queue because the queue already contains an event with the event'getQueueStatPrefix() key.")
+ private val gatewaySenderEventsConflatedMeter = CounterStatisticMeter("$queueStatPrefix.event.conflated.count", "Number of events conflated from batches.")
+ private val gatewaySenderEventsDistributionMeter = CounterStatisticMeter("$queueStatPrefix.event.distribution.count", "Number of events removed from the event queue and sent.")
+ private val gatewaySenderEventExceededAlertThresholdMeter = CounterStatisticMeter("$queueStatPrefix.event.alertthreshold.count", "Number of events exceeding the alert threshold.")
+ private val gatewaySenderEventsDistributionTimer = TimerStatisticMeter("$queueStatPrefix.event.distribution.time", "Total time spent distributing batches of events to other gateway receivers.", unit = "nanoseconds")
+ private val gatewaySenderEventsBatchDistributedMeter = CounterStatisticMeter("$queueStatPrefix.batches.distributed.count", "Number of batches of events removed from the event queue and sent.")
+ private val gatewaySenderEventsBatchRedistributedMeter = CounterStatisticMeter("$queueStatPrefix.batches.redistributed.count", "Number of batches of events removed from the event queue and resent.")
+ private val gatewaySenderEventsBatchResizedMeter = CounterStatisticMeter("$queueStatPrefix.batches.resized.count", "Number of batches that were resized because they were too large")
+ private val gatewaySenderUnprocessedTokenAddedByPrimaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.added.count", "Number of tokens added to the secondary'getQueueStatPrefix() unprocessed token map by the primary (though a listener).", arrayOf("actorType", "primary"))
+ private val gatewaySenderUnprocessedEventAddedBySecondaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.added.count", "Number of events added to the secondary'getQueueStatPrefix() unprocessed event map by the secondary.", arrayOf("actorType", "secondary"))
+ private val gatewaySenderUnprocessedEventRemovedByPrimaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary'getQueueStatPrefix() unprocessed event map by the primary (though a listener).", arrayOf("actorType", "primary"))
+ private val gatewaySenderUnprocessedTokenRemovedBySecondaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary'getQueueStatPrefix() unprocessed token map by the secondary.", arrayOf("actorType", "secondary"))
+ private val gatewaySenderUnprocessedEventRemovedByTimeoutMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary'getQueueStatPrefix() unprocessed event map by a timeout.", arrayOf("actorType", "timeout"))
+ private val gatewaySenderUnprocessedTokenRemovedByTimeoutMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary'getQueueStatPrefix() unprocessed token map by a timeout.", arrayOf("actorType", "timeout"))
+ private val gatewaySenderUnprocessedEventSizeMeter = GaugeStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.size", "Current number of entries in the secondary'getQueueStatPrefix() unprocessed event map.")
+ private val gatewaySenderUnprocessedTokenSizeMeter = GaugeStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.size", "Current number of entries in the secondary'getQueueStatPrefix() unprocessed token map.")
+ private val gatewaySenderConflationIndexSizeMeter = GaugeStatisticMeter("$queueStatPrefix.conflation.index.size", "Current number of entries in the conflation indexes map.")
+ private val gatewaySenderEventsNotQueuedMeter = CounterStatisticMeter("$queueStatPrefix.notQueued.count", "Number of events not added to queue.")
+ private val gatewaySenderEventsDroppedSenderNotRunningMeter = CounterStatisticMeter("$queueStatPrefix.events.dropped.count", "Number of events dropped because the primary gateway sender is not running.", arrayOf("reason", "primarySenderNotRunning"))
+ private val gatewaySenderEventsDroppedFilteredMeter = CounterStatisticMeter("$queueStatPrefix.events.filtered.count", "Number of events filtered through GatewayEventFilter.")
+ private val gatewaySenderLoadBalancesCompletedMeter = CounterStatisticMeter("$queueStatPrefix.loadBalances.completed.count", "Number of load balances completed")
+ private val gatewaySenderLoadBalancesInProgressMeter = GaugeStatisticMeter("$queueStatPrefix.loadBalances.inprogress.count", "Number of load balances in progress")
+ private val gatewaySenderLoadBalancesTimer = TimerStatisticMeter("$queueStatPrefix.loadBalances.time", "Total time spent load balancing this sender", unit = "nanoseconds")
+ private val gatewaySenderSynchronizationEventsQueuedMeter = CounterStatisticMeter("$queueStatPrefix.synchronization.events.queued.count", "Number of synchronization events added to the event queue.")
+ private val gatewaySenderSynchronizationEventsSentMeter = CounterStatisticMeter("$queueStatPrefix.synchronization.events.sent.count", "Number of synchronization events provided to other members.")
override fun initializeStaticMeters() {
registerMeter(gatewaySenderEventsReceivedMeter)