You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2018/08/01 18:49:51 UTC

[geode] branch feature/CleanMicrometer updated: WIP: cleaned up more imports and added CCUStats.kt, AsyncEventQueueStats.kt

This is an automated email from the ASF dual-hosted git repository.

udo pushed a commit to branch feature/CleanMicrometer
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/CleanMicrometer by this push:
     new ee0c87d  WIP: cleaned up more imports and added CCUStats.kt, AsyncEventQueueStats.kt
ee0c87d is described below

commit ee0c87d107ce34804946dc1c4d86e44547247f66
Author: Udo Kohlmeyer <uk...@pivotal.io>
AuthorDate: Wed Aug 1 11:49:36 2018 -0700

    WIP: cleaned up more imports and added CCUStats.kt, AsyncEventQueueStats.kt
---
 .../asyncqueue/internal/AsyncEventQueueImpl.java   |   1 +
 .../cache/client/internal/ConnectionImpl.java      |   1 +
 .../cache/client/internal/GetEventValueOp.java     |   1 +
 .../cache/query/internal/SortedResultsBag.java     |   1 +
 .../geode/cache/server/internal/LoadMonitor.java   |  16 +-
 .../internal/membership/gms/Services.java          |   2 +-
 .../distributed/internal/tcpserver/TcpServer.java  |   8 +-
 .../geode/internal/admin/StatAlertDefinition.java  |   3 -
 .../internal/cache/BucketPersistenceAdvisor.java   |   1 +
 .../org/apache/geode/internal/cache/HARegion.java  |   1 +
 .../internal/cache/control/HeapMemoryMonitor.java  |   3 -
 .../internal/cache/tier/sockets/BaseCommand.java   |   4 +-
 .../cache/tier/sockets/CacheClientUpdater.java     |  11 +-
 .../internal/cache/tier/sockets/command/Get70.java |   9 +-
 .../monitoring/ThreadsMonitoringProcess.java       |   2 +-
 .../internal/beans/MemberMBeanBridge.java          |   1 -
 .../org/apache/geode/statistics/cache/CCUStats.kt  |  53 ++++++
 .../geode/statistics/cache/CacheServerStats.kt     |  10 +-
 .../geode/statistics/wan/AsyncEventQueueStats.java | 186 ---------------------
 .../geode/statistics/wan/AsyncEventQueueStats.kt   |  21 +++
 .../geode/statistics/wan/GatewaySenderStats.kt     |  73 ++++----
 21 files changed, 143 insertions(+), 265 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
index d011e28..69c5ad4 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/asyncqueue/internal/AsyncEventQueueImpl.java
@@ -30,6 +30,7 @@ import org.apache.geode.internal.cache.RegionQueue;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
 import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor;
 import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import org.apache.geode.statistics.wan.AsyncEventQueueStats;
 
 public class AsyncEventQueueImpl implements AsyncEventQueue {
 
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
index 26de256..12a52e8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ConnectionImpl.java
@@ -44,6 +44,7 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.statistics.client.connection.ConnectionStats;
 
 /**
  * A single client to server connection.
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
index a3f7494..e2bc3e9 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/GetEventValueOp.java
@@ -20,6 +20,7 @@ import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.Part;
+import org.apache.geode.statistics.client.connection.ConnectionStats;
 
 /**
  * Gets (full) value (unlike GetOp, which may get either a full value or a delta depending upon
diff --git a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
index af50e8d..2195e26 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/query/internal/SortedResultsBag.java
@@ -25,6 +25,7 @@ import org.apache.geode.cache.query.internal.types.CollectionTypeImpl;
 import org.apache.geode.cache.query.types.CollectionType;
 import org.apache.geode.cache.query.types.ObjectType;
 import org.apache.geode.internal.cache.CachePerfStats;
+import org.apache.geode.statistics.cache.CachePerfStats;
 
 /**
  * This results set is used to sort the data allowing duplicates. If the data being added is already
diff --git a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
index f0b50e8..a7dd6c8 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
@@ -28,18 +28,17 @@ import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.distributed.internal.membership.MembershipManager;
 import org.apache.geode.internal.cache.CacheServerAdvisor;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
-import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.ConnectionListener;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
+import org.apache.geode.statistics.cache.CacheServerStats;
 
 /**
- * A class which monitors the load on a bridge server and periodically sends updates to the locator.
- *
+ * A class which monitors the load on a bridge server and periodically sends updates to the
+ * locator.
  * @since GemFire 5.7
- *
  */
 public class LoadMonitor implements ConnectionListener {
   private static final Logger logger = LogService.getLogger();
@@ -53,7 +52,7 @@ public class LoadMonitor implements ConnectionListener {
   protected CacheServerStats stats;
 
   public LoadMonitor(ServerLoadProbe probe, int maxConnections, long pollInterval,
-      int forceUpdateFrequency, CacheServerAdvisor advisor) {
+                     int forceUpdateFrequency, CacheServerAdvisor advisor) {
     this.probe = probe;
     this.metrics = new ServerMetricsImpl(maxConnections);
     this.pollingThread = new PollingThread(pollInterval, forceUpdateFrequency);
@@ -70,7 +69,8 @@ public class LoadMonitor implements ConnectionListener {
     this.location = location;
     this.pollingThread.start();
     this.stats = cacheServerStats;
-    this.stats.setLoad(lastLoad);
+    this.stats.setLoad(lastLoad.getConnectionLoad(), lastLoad.getLoadPerConnection(),
+        lastLoad.getSubscriptionConnectionLoad(), lastLoad.getLoadPerSubscriptionConnection());
   }
 
   /**
@@ -113,7 +113,6 @@ public class LoadMonitor implements ConnectionListener {
   /**
    * Keeps track of the clients that have added a queue since the last load was sent to the
    * server-locator.
-   *
    * @since GemFire 5.7
    */
   protected final ArrayList clientIds = new ArrayList();
@@ -199,7 +198,8 @@ public class LoadMonitor implements ConnectionListener {
                   locators);
             }
 
-            stats.setLoad(load);
+            stats.setLoad(load.getConnectionLoad(), load.getLoadPerConnection(),
+                load.getSubscriptionConnectionLoad(), load.getLoadPerSubscriptionConnection());
             if (locators != null) {
               CacheServerLoadMessage message =
                   new CacheServerLoadMessage(load, location, myClientIds);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
index eb73fe6..b37e810 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/Services.java
@@ -21,7 +21,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelCriterion;
 import org.apache.geode.ForcedDisconnectException;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
-import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.InternalLocator;
 import org.apache.geode.distributed.internal.membership.DistributedMembershipListener;
@@ -47,6 +46,7 @@ import org.apache.geode.internal.logging.LoggingThreadGroup;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.security.SecurityServiceFactory;
 import org.apache.geode.security.AuthenticationFailedException;
+import org.apache.geode.statistics.distributed.DMStats;
 
 @SuppressWarnings("ConstantConditions")
 public class Services {
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index f9278e0..8a1d858 100755
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -46,7 +46,6 @@ import org.apache.geode.cache.IncompatibleVersionException;
 import org.apache.geode.cache.UnsupportedVersionException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.distributed.internal.DistributionConfigImpl;
-import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.distributed.internal.InternalConfigurationPersistenceService;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.InternalLocator;
@@ -356,7 +355,7 @@ public class TcpServer {
    */
   private void processRequest(final Socket socket) {
     executor.execute(() -> {
-      long startTime = DistributionStats.getStatTime();
+      long startTime = System.nanoTime();
       DataInputStream input = null;
       try {
         socket.setSoTimeout(READ_TIMEOUT);
@@ -486,7 +485,7 @@ public class TcpServer {
 
       handler.endRequest(request, startTime);
 
-      startTime = DistributionStats.getStatTime();
+      startTime = System.nanoTime();
       if (response != null) {
         DataOutputStream output = new DataOutputStream(socket.getOutputStream());
         if (versionOrdinal != Version.CURRENT_ORDINAL) {
@@ -523,8 +522,7 @@ public class TcpServer {
 
     try {
       ClientProtocolService clientProtocolService = clientProtocolServiceLoader.lookupService();
-      clientProtocolService.initializeStatistics("LocatorStats",
-          internalLocator.getDistributedSystem().getStatisticsFactory());
+      clientProtocolService.initializeStatistics("LocatorStats");
       try (ClientProtocolProcessor pipeline = clientProtocolService.createProcessorForLocator(
           internalLocator, internalLocator.getCache().getSecurityService())) {
         while (!pipeline.socketProcessingIsFinished()) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
index 34540a2..d7e432b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/admin/StatAlertDefinition.java
@@ -15,7 +15,6 @@
 package org.apache.geode.internal.admin;
 
 import org.apache.geode.DataSerializable;
-import org.apache.geode.statistics.StatisticsFactory;
 import org.apache.geode.internal.admin.statalerts.StatisticInfo;
 
 /**
@@ -74,8 +73,6 @@ public interface StatAlertDefinition extends DataSerializable {
    */
   Number[] getValue(Number[] vals);
 
-  boolean verify(StatisticsFactory factory);
-
   boolean hasDecorator(String decoratorID);
 
   StatAlertDefinition getDecorator(String decoratorID);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
index e6f9ded..22b657b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketPersistenceAdvisor.java
@@ -42,6 +42,7 @@ import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
 import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.util.TransformUtils;
+import org.apache.geode.statistics.disk.DiskRegionStats;
 
 public class BucketPersistenceAdvisor extends PersistenceAdvisorImpl {
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
index cafc43f..16e8ffe 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/HARegion.java
@@ -47,6 +47,7 @@ import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.offheap.annotations.Released;
+import org.apache.geode.statistics.cache.CachePerfStats;
 
 /**
  * This region is being implemented to suppress distribution of puts and to allow localDestroys on
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
index a45b846..737937b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/control/HeapMemoryMonitor.java
@@ -47,9 +47,6 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.LoggingThreadGroup;
 import org.apache.geode.internal.logging.log4j.LocalizedMessage;
-import org.apache.geode.internal.statistics.GemFireStatSampler;
-import org.apache.geode.internal.statistics.LocalStatListener;
-import org.apache.geode.internal.statistics.StatisticsImpl;
 import org.apache.geode.statistics.resourcemanger.ResourceManagerStats;
 
 /**
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 82cd631..007c9b6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -48,7 +48,6 @@ import org.apache.geode.cache.persistence.PartitionOfflineException;
 import org.apache.geode.cache.query.types.CollectionType;
 import org.apache.geode.distributed.DistributedSystemDisconnectedException;
 import org.apache.geode.distributed.internal.DistributionConfig;
-import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.Assert;
 import org.apache.geode.internal.Version;
@@ -81,6 +80,7 @@ import org.apache.geode.internal.offheap.OffHeapHelper;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.security.GemFireSecurityException;
+import org.apache.geode.statistics.cache.CacheServerStats;
 
 public abstract class BaseCommand implements Command {
   protected static final Logger logger = LogService.getLogger();
@@ -141,7 +141,7 @@ public abstract class BaseCommand implements Command {
   public void execute(Message clientMessage, ServerConnection serverConnection,
       SecurityService securityService) {
     // Read the request and update the statistics
-    long start = DistributionStats.getStatTime();
+    long start = System.nanoTime();
     if (EntryLogger.isEnabled() && serverConnection != null) {
       EntryLogger.setSource(serverConnection.getMembershipID(), "c2s");
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index e84ba27..9c6da5b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -37,11 +37,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.geode.CancelException;
 import org.apache.geode.DataSerializer;
 import org.apache.geode.InvalidDeltaException;
-import org.apache.geode.statistics.StatisticDescriptor;
-import org.apache.geode.statistics.Statistics;
-import org.apache.geode.statistics.StatisticsFactory;
-import org.apache.geode.statistics.StatisticsType;
-import org.apache.geode.statistics.StatisticsTypeFactory;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.InterestResultPolicy;
 import org.apache.geode.cache.Operation;
@@ -88,10 +83,10 @@ import org.apache.geode.internal.logging.log4j.LogMarker;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.sequencelog.EntryLogger;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
 import org.apache.geode.security.AuthenticationFailedException;
 import org.apache.geode.security.AuthenticationRequiredException;
 import org.apache.geode.security.GemFireSecurityException;
+import org.apache.geode.statistics.cache.CCUStats;
 
 /**
  * {@code CacheClientUpdater} is a thread that processes update messages from a cache server and
@@ -288,7 +283,7 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
     // this holds the connection which this threads reads
     this.eManager = eManager;
     this.endpoint = endpoint;
-    this.stats = new CCUStats(this.system.getStatisticsFactory(), this.location);
+    this.stats = new CCUStats(this.location.toString());
 
     // Create the connection...
     final boolean isDebugEnabled = logger.isDebugEnabled();
@@ -554,8 +549,6 @@ public class CacheClientUpdater extends Thread implements ClientUpdater, Disconn
       // ignore
     }
 
-    this.stats.close();
-
     // close the helper
     if (this.cacheHelper != null) {
       this.cacheHelper.close();
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
index e3da197..0497259 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Get70.java
@@ -20,7 +20,6 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.client.internal.GetOp;
 import org.apache.geode.cache.operations.GetOperationContext;
 import org.apache.geode.cache.operations.internal.GetOperationContextImpl;
-import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.i18n.StringId;
 import org.apache.geode.internal.cache.CachedDeserializable;
 import org.apache.geode.internal.cache.LocalRegion;
@@ -31,7 +30,6 @@ import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
-import org.apache.geode.internal.cache.tier.sockets.CacheServerStats;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.cache.tier.sockets.Message;
 import org.apache.geode.internal.cache.tier.sockets.Part;
@@ -47,6 +45,7 @@ import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
 import org.apache.geode.security.ResourcePermission.Operation;
 import org.apache.geode.security.ResourcePermission.Resource;
+import org.apache.geode.statistics.cache.CacheServerStats;
 
 public class Get70 extends BaseCommand {
 
@@ -71,7 +70,7 @@ public class Get70 extends BaseCommand {
     // requiresResponse = true;
     {
       long oldStart = start;
-      start = DistributionStats.getStatTime();
+      start = System.nanoTime();
       stats.incReadGetRequestTime(start - oldStart);
     }
     // Retrieve the data from the message parts
@@ -194,7 +193,7 @@ public class Get70 extends BaseCommand {
       data = securityService.postProcess(regionName, key, data, entry.isObject);
 
       long oldStart = start;
-      start = DistributionStats.getStatTime();
+      start = System.nanoTime();
       stats.incProcessGetTime(start - oldStart);
 
       if (region instanceof PartitionedRegion) {
@@ -220,7 +219,7 @@ public class Get70 extends BaseCommand {
       logger.debug("{}: Wrote get response back to {} for region {} {}", serverConnection.getName(),
           serverConnection.getSocketString(), regionName, entry);
     }
-    stats.incWriteGetResponseTime(DistributionStats.getStatTime() - start);
+    stats.incWriteGetResponseTime(System.nanoTime() - start);
 
 
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
index 166346f..652253b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/monitoring/ThreadsMonitoringProcess.java
@@ -26,9 +26,9 @@ import org.apache.geode.distributed.internal.DistributionConfigImpl;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.ResourceManagerStats;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.monitoring.executor.AbstractExecutor;
+import org.apache.geode.statistics.resourcemanger.ResourceManagerStats;
 
 
 public class ThreadsMonitoringProcess extends TimerTask {
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
index 7dffd7b..d5aef60 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/beans/MemberMBeanBridge.java
@@ -42,7 +42,6 @@ import javax.management.ObjectName;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.internal.statistics.InternalDistributedSystemStats;
-import org.apache.geode.statistics.Statistics;
 import org.apache.geode.statistics.StatisticsType;
 import org.apache.geode.cache.DiskStore;
 import org.apache.geode.cache.Region;
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt
new file mode 100644
index 0000000..8d27e57
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CCUStats.kt
@@ -0,0 +1,53 @@
+package org.apache.geode.statistics.cache
+
+import org.apache.geode.internal.cache.tier.sockets.MessageStats
+import org.apache.geode.statistics.internal.micrometer.CounterStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.GaugeStatisticMeter
+import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
+
+/**
+ * Stats for a CacheClientUpdater. Currently the only thing measured are incoming bytes on the
+ * wire
+ *
+ * @since GemFire 5.7
+ */
+class CCUStats internal constructor(private val serverLocation: String) : MicrometerMeterGroup("CacheClientUpdaterStats-$serverLocation"), MessageStats {
+
+    override fun getCommonTags(): Array<String> = arrayOf("serverLocation", serverLocation)
+
+    private val clientUpdaterReceivedBytesMeter = CounterStatisticMeter("client.updater.received.bytes.count", "Total number of bytes received from the server.", unit = "bytes")
+    private val clientUpdateMessagesReceivedMeter = GaugeStatisticMeter("client.updater.received.messages.count", "Current number of message being received off the network or being processed after reception.")
+    private val clientUpdateMessagesReceivedBytesMeter = GaugeStatisticMeter("client.updater.received.messages.bytes", "Current number of bytes consumed by messages being received or processed.", unit = "bytes")
+
+    override fun initializeStaticMeters() {
+        registerMeter(clientUpdaterReceivedBytesMeter)
+        registerMeter(clientUpdateMessagesReceivedMeter)
+        registerMeter(clientUpdateMessagesReceivedBytesMeter)
+    }
+
+    override fun incReceivedBytes(bytes: Long) {
+        clientUpdaterReceivedBytesMeter.increment(bytes)
+    }
+
+    override fun incSentBytes(v: Long) {
+        // noop since we never send messages
+    }
+
+    override fun incMessagesBytesBeingReceived(bytes: Int) {
+        clientUpdateMessagesReceivedBytesMeter.increment(bytes)
+    }
+
+    override fun decMessagesBytesBeingReceived(bytes: Int) {
+        clientUpdateMessagesReceivedBytesMeter.decrement(bytes)
+    }
+
+    fun incMessagesBeingReceived(bytes: Int) {
+        clientUpdateMessagesReceivedMeter.increment()
+        incMessagesBytesBeingReceived(bytes)
+    }
+
+    fun decMessagesBeingReceived(bytes: Int) {
+        clientUpdateMessagesReceivedMeter.decrement()
+        decMessagesBytesBeingReceived(bytes)
+    }
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
index fe20d90..1851e52 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/cache/CacheServerStats.kt
@@ -605,11 +605,11 @@ open class CacheServerStats @JvmOverloads constructor(private val ownerName: Str
         cacheServerClientReadyResponseWrittenMeter.increment()
     }
 
-    fun setLoad(connectionLoad: Int, loadPerConnection: Int, queueLoad: Int, loadPerQueue: Int) {
-        cacheServerConnectionLoadMeter.increment(connectionLoad)
-        cacheServerLoadPerConnectionMeter.increment(loadPerConnection)
-        cacheServerQueueLoadMeter.increment(queueLoad)
-        cacheServerLoadPerQueueMeter.increment(loadPerQueue)
+    fun setLoad(connectionLoad: Float, loadPerConnection: Float, queueLoad: Float, loadPerQueue: Float) {
+        cacheServerConnectionLoadMeter.increment(connectionLoad.toDouble())
+        cacheServerLoadPerConnectionMeter.increment(loadPerConnection.toDouble())
+        cacheServerQueueLoadMeter.increment(queueLoad.toDouble())
+        cacheServerLoadPerQueueMeter.increment(loadPerQueue.toDouble())
     }
 
     val cnxPoolHelper: PoolStatHelper
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java
deleted file mode 100644
index 746dd1b..0000000
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.statistics.wan;
-
-import org.apache.geode.statistics.StatisticDescriptor;
-import org.apache.geode.statistics.StatisticsFactory;
-import org.apache.geode.statistics.StatisticsType;
-import org.apache.geode.statistics.StatisticsTypeFactory;
-import org.apache.geode.internal.cache.wan.GatewaySenderStats;
-import org.apache.geode.internal.statistics.StatisticsTypeFactoryImpl;
-
-public class AsyncEventQueueStats extends GatewaySenderStats {
-
-  public static final String typeName = "AsyncEventQueueStatistics";
-
-  /**
-   * The <code>StatisticsType</code> of the statistics
-   */
-  private StatisticsType type;
-
-  @Override
-  protected void initializeStats(StatisticsFactory factory) {
-    type = factory.createType(typeName, "Stats for activity in the AsyncEventQueue",
-        new StatisticDescriptor[]{
-            factory.createIntCounter(GatewaySenderStats.EVENTS_RECEIVED, "Number of events received by this queue.",
-                "operations"),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_QUEUED, "Number of events added to the event queue.",
-                "operations"),
-            factory.createLongCounter(GatewaySenderStats.EVENT_QUEUE_TIME, "Total time spent queueing events.",
-                "nanoseconds"),
-            factory.createIntGauge(GatewaySenderStats.EVENT_QUEUE_SIZE, "Size of the event queue.", "operations",
-                false),
-            factory.createIntGauge(GatewaySenderStats.SECONDARY_EVENT_QUEUE_SIZE, "Size of the secondary event queue.",
-                "operations", false),
-            factory.createIntGauge(GatewaySenderStats.EVENTS_PROCESSED_BY_PQRM,
-                "Total number of events processed by Parallel Queue Removal Message(PQRM).",
-                "operations", false),
-            factory.createIntGauge(GatewaySenderStats.TMP_EVENT_QUEUE_SIZE, "Size of the temporary events queue.",
-                "operations", false),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_NOT_QUEUED_CONFLATED,
-                "Number of events received but not added to the event queue because the queue already contains an event with the event's key.",
-                "operations"),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_CONFLATED_FROM_BATCHES,
-                "Number of events conflated from batches.", "operations"),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_DISTRIBUTED,
-                "Number of events removed from the event queue and sent.", "operations"),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_EXCEEDING_ALERT_THRESHOLD,
-                "Number of events exceeding the alert threshold.", "operations", false),
-            factory.createLongCounter(GatewaySenderStats.BATCH_DISTRIBUTION_TIME,
-                "Total time spent distributing batches of events to receivers.", "nanoseconds"),
-            factory.createIntCounter(GatewaySenderStats.BATCHES_DISTRIBUTED,
-                "Number of batches of events removed from the event queue and sent.", "operations"),
-            factory.createIntCounter(GatewaySenderStats.BATCHES_REDISTRIBUTED,
-                "Number of batches of events removed from the event queue and resent.",
-                "operations", false),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_ADDED_BY_PRIMARY,
-                "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).",
-                "tokens"),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_ADDED_BY_SECONDARY,
-                "Number of events added to the secondary's unprocessed event map by the secondary.",
-                "events"),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY,
-                "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).",
-                "events"),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY,
-                "Number of tokens removed from the secondary's unprocessed token map by the secondary.",
-                "tokens"),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT,
-                "Number of events removed from the secondary's unprocessed event map by a timeout.",
-                "events"),
-            factory.createIntCounter(GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT,
-                "Number of tokens removed from the secondary's unprocessed token map by a timeout.",
-                "tokens"),
-            factory.createIntGauge(GatewaySenderStats.UNPROCESSED_EVENT_MAP_SIZE,
-                "Current number of entries in the secondary's unprocessed event map.", "events",
-                false),
-            factory.createIntGauge(GatewaySenderStats.UNPROCESSED_TOKEN_MAP_SIZE,
-                "Current number of entries in the secondary's unprocessed token map.", "tokens",
-                false),
-            factory.createIntGauge(GatewaySenderStats.CONFLATION_INDEXES_MAP_SIZE,
-                "Current number of entries in the conflation indexes map.", "events"),
-            factory.createIntCounter(GatewaySenderStats.NOT_QUEUED_EVENTS, "Number of events not added to queue.",
-                "events"),
-            factory.createIntCounter(
-                GatewaySenderStats.EVENTS_DROPPED_DUE_TO_PRIMARY_SENDER_NOT_RUNNING,
-                "Number of events dropped because the primary gateway sender is not running.",
-                "events"),
-            factory.createIntCounter(GatewaySenderStats.EVENTS_FILTERED,
-                "Number of events filtered through GatewayEventFilter.", "events"),
-            factory.createIntCounter(GatewaySenderStats.LOAD_BALANCES_COMPLETED, "Number of load balances completed",
-                "operations"),
-            factory.createIntGauge(GatewaySenderStats.LOAD_BALANCES_IN_PROGRESS, "Number of load balances in progress",
-                "operations"),
-            factory.createLongCounter(GatewaySenderStats.LOAD_BALANCE_TIME,
-                "Total time spent load balancing this sender",
-                "nanoseconds"),
-            factory.createIntCounter(GatewaySenderStats.SYNCHRONIZATION_EVENTS_ENQUEUED,
-                "Number of synchronization events added to the event queue.", "operations"),
-            factory.createIntCounter(GatewaySenderStats.SYNCHRONIZATION_EVENTS_PROVIDED,
-                "Number of synchronization events provided to other members.", "operations"),});
-
-    // Initialize id fields
-    GatewaySenderStats.eventsReceivedId = type.nameToId(GatewaySenderStats.EVENTS_RECEIVED);
-    GatewaySenderStats.eventsQueuedId = type.nameToId(GatewaySenderStats.EVENTS_QUEUED);
-    GatewaySenderStats.eventsNotQueuedConflatedId = type.nameToId(
-        GatewaySenderStats.EVENTS_NOT_QUEUED_CONFLATED);
-    GatewaySenderStats.eventQueueTimeId = type.nameToId(GatewaySenderStats.EVENT_QUEUE_TIME);
-    GatewaySenderStats.eventQueueSizeId = type.nameToId(GatewaySenderStats.EVENT_QUEUE_SIZE);
-    GatewaySenderStats.secondaryEventQueueSizeId = type.nameToId(
-        GatewaySenderStats.SECONDARY_EVENT_QUEUE_SIZE);
-    GatewaySenderStats.eventsProcessedByPQRMId = type.nameToId(
-        GatewaySenderStats.EVENTS_PROCESSED_BY_PQRM);
-    GatewaySenderStats.eventTmpQueueSizeId = type.nameToId(GatewaySenderStats.TMP_EVENT_QUEUE_SIZE);
-    GatewaySenderStats.eventsDistributedId = type.nameToId(GatewaySenderStats.EVENTS_DISTRIBUTED);
-    GatewaySenderStats.eventsExceedingAlertThresholdId = type.nameToId(
-        GatewaySenderStats.EVENTS_EXCEEDING_ALERT_THRESHOLD);
-    GatewaySenderStats.batchDistributionTimeId = type.nameToId(
-        GatewaySenderStats.BATCH_DISTRIBUTION_TIME);
-    GatewaySenderStats.batchesDistributedId = type.nameToId(GatewaySenderStats.BATCHES_DISTRIBUTED);
-    GatewaySenderStats.batchesRedistributedId = type.nameToId(
-        GatewaySenderStats.BATCHES_REDISTRIBUTED);
-    GatewaySenderStats.unprocessedTokensAddedByPrimaryId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_TOKENS_ADDED_BY_PRIMARY);
-    GatewaySenderStats.unprocessedEventsAddedBySecondaryId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_EVENTS_ADDED_BY_SECONDARY);
-    GatewaySenderStats.unprocessedEventsRemovedByPrimaryId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_PRIMARY);
-    GatewaySenderStats.unprocessedTokensRemovedBySecondaryId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_SECONDARY);
-    GatewaySenderStats.unprocessedEventsRemovedByTimeoutId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_EVENTS_REMOVED_BY_TIMEOUT);
-    GatewaySenderStats.unprocessedTokensRemovedByTimeoutId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_TOKENS_REMOVED_BY_TIMEOUT);
-    GatewaySenderStats.unprocessedEventMapSizeId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_EVENT_MAP_SIZE);
-    GatewaySenderStats.unprocessedTokenMapSizeId = type.nameToId(
-        GatewaySenderStats.UNPROCESSED_TOKEN_MAP_SIZE);
-    GatewaySenderStats.conflationIndexesMapSizeId = type.nameToId(
-        GatewaySenderStats.CONFLATION_INDEXES_MAP_SIZE);
-    GatewaySenderStats.notQueuedEventsId = type.nameToId(GatewaySenderStats.NOT_QUEUED_EVENTS);
-    GatewaySenderStats.eventsDroppedDueToPrimarySenderNotRunningId =
-        type.nameToId(GatewaySenderStats.EVENTS_DROPPED_DUE_TO_PRIMARY_SENDER_NOT_RUNNING);
-    GatewaySenderStats.eventsFilteredId = type.nameToId(GatewaySenderStats.EVENTS_FILTERED);
-    GatewaySenderStats.eventsConflatedFromBatchesId = type.nameToId(
-        GatewaySenderStats.EVENTS_CONFLATED_FROM_BATCHES);
-    GatewaySenderStats.loadBalancesCompletedId = type.nameToId(
-        GatewaySenderStats.LOAD_BALANCES_COMPLETED);
-    GatewaySenderStats.loadBalancesInProgressId = type.nameToId(
-        GatewaySenderStats.LOAD_BALANCES_IN_PROGRESS);
-    GatewaySenderStats.loadBalanceTimeId = type.nameToId(GatewaySenderStats.LOAD_BALANCE_TIME);
-    GatewaySenderStats.synchronizationEventsEnqueuedId = type.nameToId(
-        GatewaySenderStats.SYNCHRONIZATION_EVENTS_ENQUEUED);
-    GatewaySenderStats.synchronizationEventsProvidedId = type.nameToId(
-        GatewaySenderStats.SYNCHRONIZATION_EVENTS_PROVIDED);
-  }
-
-  /**
-   * Constructor.
-   * @param factory The <code>StatisticsFactory</code> which creates the <code>Statistics</code>
-   * instance
-   * @param asyncQueueId The id of the <code>AsyncEventQueue</code> used to generate the name of
-   * the
-   * <code>Statistics</code>
-   */
-  public AsyncEventQueueStats(StatisticsFactory factory, String asyncQueueId) {
-    super();
-    initializeStats(factory);
-    this.stats = factory.createAtomicStatistics(type, "asyncEventQueueStats-" + asyncQueueId);
-  }
-
-  public StatisticsType getType() {
-    return type;
-  }
-}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt
new file mode 100644
index 0000000..118e2bd
--- /dev/null
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/AsyncEventQueueStats.kt
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.statistics.wan
+
+class AsyncEventQueueStats(private val asyncQueueName: String) : GatewaySenderStats(asyncQueueName, "AsyncQueueStats-$asyncQueueName") {
+    override fun getCommonTags(): Array<String> = arrayOf("asyncQueueName", asyncQueueName)
+    override val queueStatPrefix: String by lazy { "async" }
+
+}
diff --git a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
index 95c2907..06ebc29 100644
--- a/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
+++ b/geode-micrometer-stats/src/main/kotlin/org/apache/geode/statistics/wan/GatewaySenderStats.kt
@@ -20,42 +20,43 @@ import org.apache.geode.statistics.internal.micrometer.MicrometerMeterGroup
 import org.apache.geode.statistics.internal.micrometer.TimerStatisticMeter
 import org.apache.geode.statistics.util.NOW_NANOS
 
-class GatewaySenderStats(private val gateReceiverName: String) : MicrometerMeterGroup("GatewayReceiverStats-$gateReceiverName") {
-
-    override fun getCommonTags(): Array<String> = arrayOf("gatewaySenderName", gateReceiverName)
-
-    private val gatewaySenderEventsReceivedMeter = CounterStatisticMeter("gateway.sender.event.received.count", "Number of events received by this Sender.")
-    private val gatewaySenderEventsQueueMeter = CounterStatisticMeter("gateway.sender.event.queued.count", "Number of events added to the event queue.")
-    private val gatewaySenderEventsQueueTimer = TimerStatisticMeter("gateway.sender.event.queue.time", "Total time spent queueing events.", unit = "nanoseconds")
-    private val gatewaySenderEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of the event queue.", arrayOf("queueType", "primary"))
-    private val gatewaySenderSecondaryEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of secondary event queue.", arrayOf("queueType", "secondary"))
-    private val gatewaySenderEventsProcessedByPQRMMeter = GaugeStatisticMeter("gateway.sender.event.processed.pqrm.count", "Total number of events processed by Parallel Queue Removal Message(PQRM).")
-    private val gatewaySenderTempEventQueueSizeMeter = GaugeStatisticMeter("gateway.sender.event.queue.size", "Size of the temporary events.", arrayOf("queueType", "temp"))
-    private val gatewaySenderEventsNotQueuedConflatedMeter = CounterStatisticMeter("gateway.sender.event.notqueued.conflated.count", "Number of events received but not added to the event queue because the queue already contains an event with the event's key.")
-    private val gatewaySenderEventsConflatedMeter = CounterStatisticMeter("gateway.sender.event.conflated.count", "Number of events conflated from batches.")
-    private val gatewaySenderEventsDistributionMeter = CounterStatisticMeter("gateway.sender.event.distribution.count", "Number of events removed from the event queue and sent.")
-    private val gatewaySenderEventExceededAlertThresholdMeter = CounterStatisticMeter("gateway.sender.event.alertthreshold.count", "Number of events exceeding the alert threshold.")
-    private val gatewaySenderEventsDistributionTimer = TimerStatisticMeter("gateway.sender.event.distribution.time", "Total time spent distributing batches of events to other gateway receivers.", unit = "nanoseconds")
-    private val gatewaySenderEventsBatchDistributedMeter = CounterStatisticMeter("gateway.sender.batches.distributed.count", "Number of batches of events removed from the event queue and sent.")
-    private val gatewaySenderEventsBatchRedistributedMeter = CounterStatisticMeter("gateway.sender.batches.redistributed.count", "Number of batches of events removed from the event queue and resent.")
-    private val gatewaySenderEventsBatchResizedMeter = CounterStatisticMeter("gateway.sender.batches.resized.count", "Number of batches that were resized because they were too large")
-    private val gatewaySenderUnprocessedTokenAddedByPrimaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.added.count", "Number of tokens added to the secondary's unprocessed token map by the primary (though a listener).", arrayOf("actorType", "primary"))
-    private val gatewaySenderUnprocessedEventAddedBySecondaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.added.count", "Number of events added to the secondary's unprocessed event map by the secondary.", arrayOf("actorType", "secondary"))
-    private val gatewaySenderUnprocessedEventRemovedByPrimaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary's unprocessed event map by the primary (though a listener).", arrayOf("actorType", "primary"))
-    private val gatewaySenderUnprocessedTokenRemovedBySecondaryMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary's unprocessed token map by the secondary.", arrayOf("actorType", "secondary"))
-    private val gatewaySenderUnprocessedEventRemovedByTimeoutMeter = CounterStatisticMeter("gateway.sender.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary's unprocessed event map by a timeout.", arrayOf("actorType", "timeout"))
-    private val gatewaySenderUnprocessedTokenRemovedByTimeoutMeter = CounterStatisticMeter("gateway.sender.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary's unprocessed token map by a timeout.", arrayOf("actorType", "timeout"))
-    private val gatewaySenderUnprocessedEventSizeMeter = GaugeStatisticMeter("gateway.sender.unprocessed.event.secondary.size", "Current number of entries in the secondary's unprocessed event map.")
-    private val gatewaySenderUnprocessedTokenSizeMeter = GaugeStatisticMeter("gateway.sender.unprocessed.token.secondary.size", "Current number of entries in the secondary's unprocessed token map.")
-    private val gatewaySenderConflationIndexSizeMeter = GaugeStatisticMeter("gateway.sender.conflation.index.size", "Current number of entries in the conflation indexes map.")
-    private val gatewaySenderEventsNotQueuedMeter = CounterStatisticMeter("gateway.sender.notQueued.count", "Number of events not added to queue.")
-    private val gatewaySenderEventsDroppedSenderNotRunningMeter = CounterStatisticMeter("gateway.sender.events.dropped.count", "Number of events dropped because the primary gateway sender is not running.", arrayOf("reason", "primarySenderNotRunning"))
-    private val gatewaySenderEventsDroppedFilteredMeter = CounterStatisticMeter("gateway.sender.events.filtered.count", "Number of events filtered through GatewayEventFilter.")
-    private val gatewaySenderLoadBalancesCompletedMeter = CounterStatisticMeter("gateway.sender.loadBalances.completed.count", "Number of load balances completed")
-    private val gatewaySenderLoadBalancesInProgressMeter = GaugeStatisticMeter("gateway.sender.loadBalances.inprogress.count", "Number of load balances in progress")
-    private val gatewaySenderLoadBalancesTimer = TimerStatisticMeter("gateway.sender.loadBalances.time", "Total time spent load balancing this sender", unit = "nanoseconds")
-    private val gatewaySenderSynchronizationEventsQueuedMeter = CounterStatisticMeter("gateway.sender.synchronization.events.queued.count", "Number of synchronization events added to the event queue.")
-    private val gatewaySenderSynchronizationEventsSentMeter = CounterStatisticMeter("gateway.sender.synchronization.events.sent.count", "Number of synchronization events provided to other members.")
+open class GatewaySenderStats @JvmOverloads constructor(private val queueName: String, private val groupName: String = "GatewayReceiverStats-$queueName") : MicrometerMeterGroup(groupName) {
+
+    override fun getCommonTags(): Array<String> = arrayOf("gatewaySenderName", queueName)
+    open val queueStatPrefix: String by lazy { "gateway.sender" }
+
+    private val gatewaySenderEventsReceivedMeter = CounterStatisticMeter("$queueStatPrefix.event.received.count", "Number of events received by this Sender.")
+    private val gatewaySenderEventsQueueMeter = CounterStatisticMeter("$queueStatPrefix.event.queued.count", "Number of events added to the event queue.")
+    private val gatewaySenderEventsQueueTimer = TimerStatisticMeter("$queueStatPrefix.event.queue.time", "Total time spent queueing events.", unit = "nanoseconds")
+    private val gatewaySenderEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of the event queue.", arrayOf("queueType", "primary"))
+    private val gatewaySenderSecondaryEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of secondary event queue.", arrayOf("queueType", "secondary"))
+    private val gatewaySenderEventsProcessedByPQRMMeter = GaugeStatisticMeter("$queueStatPrefix.event.processed.pqrm.count", "Total number of events processed by Parallel Queue Removal Message(PQRM).")
+    private val gatewaySenderTempEventQueueSizeMeter = GaugeStatisticMeter("$queueStatPrefix.event.queue.size", "Size of the temporary events.", arrayOf("queueType", "temp"))
+    private val gatewaySenderEventsNotQueuedConflatedMeter = CounterStatisticMeter("$queueStatPrefix.event.notqueued.conflated.count", "Number of events received but not added to the event queue because the queue already contains an event with the event'getQueueStatPrefix() key.")
+    private val gatewaySenderEventsConflatedMeter = CounterStatisticMeter("$queueStatPrefix.event.conflated.count", "Number of events conflated from batches.")
+    private val gatewaySenderEventsDistributionMeter = CounterStatisticMeter("$queueStatPrefix.event.distribution.count", "Number of events removed from the event queue and sent.")
+    private val gatewaySenderEventExceededAlertThresholdMeter = CounterStatisticMeter("$queueStatPrefix.event.alertthreshold.count", "Number of events exceeding the alert threshold.")
+    private val gatewaySenderEventsDistributionTimer = TimerStatisticMeter("$queueStatPrefix.event.distribution.time", "Total time spent distributing batches of events to other gateway receivers.", unit = "nanoseconds")
+    private val gatewaySenderEventsBatchDistributedMeter = CounterStatisticMeter("$queueStatPrefix.batches.distributed.count", "Number of batches of events removed from the event queue and sent.")
+    private val gatewaySenderEventsBatchRedistributedMeter = CounterStatisticMeter("$queueStatPrefix.batches.redistributed.count", "Number of batches of events removed from the event queue and resent.")
+    private val gatewaySenderEventsBatchResizedMeter = CounterStatisticMeter("$queueStatPrefix.batches.resized.count", "Number of batches that were resized because they were too large")
+    private val gatewaySenderUnprocessedTokenAddedByPrimaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.added.count", "Number of tokens added to the secondary'getQueueStatPrefix() unprocessed token map by the primary (though a listener).", arrayOf("actorType", "primary"))
+    private val gatewaySenderUnprocessedEventAddedBySecondaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.added.count", "Number of events added to the secondary'getQueueStatPrefix() unprocessed event map by the secondary.", arrayOf("actorType", "secondary"))
+    private val gatewaySenderUnprocessedEventRemovedByPrimaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary'getQueueStatPrefix() unprocessed event map by the primary (though a listener).", arrayOf("actorType", "primary"))
+    private val gatewaySenderUnprocessedTokenRemovedBySecondaryMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary'getQueueStatPrefix() unprocessed token map by the secondary.", arrayOf("actorType", "secondary"))
+    private val gatewaySenderUnprocessedEventRemovedByTimeoutMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.removed.count", "Number of events removed from the secondary'getQueueStatPrefix() unprocessed event map by a timeout.", arrayOf("actorType", "timeout"))
+    private val gatewaySenderUnprocessedTokenRemovedByTimeoutMeter = CounterStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.removed.count", "Number of tokens removed from the secondary'getQueueStatPrefix() unprocessed token map by a timeout.", arrayOf("actorType", "timeout"))
+    private val gatewaySenderUnprocessedEventSizeMeter = GaugeStatisticMeter("$queueStatPrefix.unprocessed.event.secondary.size", "Current number of entries in the secondary'getQueueStatPrefix() unprocessed event map.")
+    private val gatewaySenderUnprocessedTokenSizeMeter = GaugeStatisticMeter("$queueStatPrefix.unprocessed.token.secondary.size", "Current number of entries in the secondary'getQueueStatPrefix() unprocessed token map.")
+    private val gatewaySenderConflationIndexSizeMeter = GaugeStatisticMeter("$queueStatPrefix.conflation.index.size", "Current number of entries in the conflation indexes map.")
+    private val gatewaySenderEventsNotQueuedMeter = CounterStatisticMeter("$queueStatPrefix.notQueued.count", "Number of events not added to queue.")
+    private val gatewaySenderEventsDroppedSenderNotRunningMeter = CounterStatisticMeter("$queueStatPrefix.events.dropped.count", "Number of events dropped because the primary gateway sender is not running.", arrayOf("reason", "primarySenderNotRunning"))
+    private val gatewaySenderEventsDroppedFilteredMeter = CounterStatisticMeter("$queueStatPrefix.events.filtered.count", "Number of events filtered through GatewayEventFilter.")
+    private val gatewaySenderLoadBalancesCompletedMeter = CounterStatisticMeter("$queueStatPrefix.loadBalances.completed.count", "Number of load balances completed")
+    private val gatewaySenderLoadBalancesInProgressMeter = GaugeStatisticMeter("$queueStatPrefix.loadBalances.inprogress.count", "Number of load balances in progress")
+    private val gatewaySenderLoadBalancesTimer = TimerStatisticMeter("$queueStatPrefix.loadBalances.time", "Total time spent load balancing this sender", unit = "nanoseconds")
+    private val gatewaySenderSynchronizationEventsQueuedMeter = CounterStatisticMeter("$queueStatPrefix.synchronization.events.queued.count", "Number of synchronization events added to the event queue.")
+    private val gatewaySenderSynchronizationEventsSentMeter = CounterStatisticMeter("$queueStatPrefix.synchronization.events.sent.count", "Number of synchronization events provided to other members.")
 
     override fun initializeStaticMeters() {
         registerMeter(gatewaySenderEventsReceivedMeter)