You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2021/06/28 16:44:35 UTC

[pulsar] branch master updated: [PIP-82] [pulsar-broker] Resource group metrics (#11024)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8da39f3  [PIP-82] [pulsar-broker] Resource group metrics (#11024)
8da39f3 is described below

commit 8da39f32e87bdd9be8ea73146ce17abe6fb5cc69
Author: kaushik-develop <80...@users.noreply.github.com>
AuthorDate: Mon Jun 28 09:43:52 2021 -0700

    [PIP-82] [pulsar-broker] Resource group metrics (#11024)
    
    Co-authored-by: Kaushik Ghosh <ka...@splunk.com>
---
 .../pulsar/broker/resourcegroup/ResourceGroup.java |  94 +-
 .../broker/resourcegroup/ResourceGroupService.java | 182 +++-
 .../RGUsageMTAggrWaitForAllMesgs.java              | 978 +++++++++++++++++++++
 3 files changed, 1213 insertions(+), 41 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index 6bb1dc8..5560216 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.resourcegroup;
 
+import io.prometheus.client.Counter;
 import java.util.HashMap;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -166,7 +167,7 @@ public class ResourceGroup {
 
             // If this is the first ref, register with the transport manager.
             if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 1) {
-                log.info("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
+                log.debug("registerUsage for RG={}: registering with transport-mgr", this.resourceGroupName);
                 transportManager.registerResourceUsagePublisher(this.ruPublisher);
                 transportManager.registerResourceUsageConsumer(this.ruConsumer);
             }
@@ -179,7 +180,7 @@ public class ResourceGroup {
 
             // If this was the last ref, unregister from the transport manager.
             if (this.resourceGroupTenantRefs.size() + this.resourceGroupNamespaceRefs.size() == 0) {
-                log.info("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
+                log.debug("unRegisterUsage for RG={}: un-registering from transport-mgr", this.resourceGroupName);
                 transportManager.unregisterResourceUsageConsumer(this.ruConsumer);
                 transportManager.unregisterResourceUsagePublisher(this.ruPublisher);
             }
@@ -283,18 +284,48 @@ public class ResourceGroup {
         return retStats;
     }
 
-    protected void updateLocalQuota(ResourceGroupMonitoringClass monClass, BytesAndMessagesCount newQuota)
-                                                                                        throws PulsarAdminException {
+    protected BytesAndMessagesCount updateLocalQuota(ResourceGroupMonitoringClass monClass,
+                                                     BytesAndMessagesCount newQuota) throws PulsarAdminException {
         this.checkMonitoringClass(monClass);
+        BytesAndMessagesCount oldBMCount = new BytesAndMessagesCount();
 
         final PerMonitoringClassFields monEntity = this.monitoringClassFields[monClass.ordinal()];
         monEntity.localUsageStatsLock.lock();
+        oldBMCount = monEntity.quotaForNextPeriod;
         try {
-            monEntity.quotaForNextPeriod.bytes = newQuota.bytes;
-            monEntity.quotaForNextPeriod.messages = newQuota.messages;
+            monEntity.quotaForNextPeriod = newQuota;
         } finally {
             monEntity.localUsageStatsLock.unlock();
         }
+        log.debug("updateLocalQuota for RG={}: set local {} quota to bytes={}, messages={}",
+                this.resourceGroupName, monClass, newQuota.bytes, newQuota.messages);
+
+        return oldBMCount;
+    }
+
+    // Visibility for unit testing
+    protected static double getRgRemoteUsageByteCount (String rgName, String monClassName, String brokerName) {
+        return rgRemoteUsageReportsBytes.labels(rgName, monClassName, brokerName).get();
+    }
+
+    // Visibility for unit testing
+    protected static double getRgRemoteUsageMessageCount (String rgName, String monClassName, String brokerName) {
+        return rgRemoteUsageReportsMessages.labels(rgName, monClassName, brokerName).get();
+    }
+
+    // Visibility for unit testing
+    protected static double getRgUsageReportedCount (String rgName, String monClassName) {
+        return rgLocalUsageReportCount.labels(rgName, monClassName).get();
+    }
+
+    // Visibility for unit testing
+    protected static BytesAndMessagesCount accumulateBMCount(BytesAndMessagesCount ... bmCounts) {
+        BytesAndMessagesCount retBMCount = new BytesAndMessagesCount();
+        for (int ix = 0; ix < bmCounts.length; ix++) {
+            retBMCount.messages += bmCounts[ix].messages;
+            retBMCount.bytes += bmCounts[ix].bytes;
+        }
+        return retBMCount;
     }
 
     private void checkMonitoringClass(ResourceGroupMonitoringClass monClass) throws PulsarAdminException {
@@ -309,9 +340,6 @@ public class ResourceGroup {
     // Returns true if something was filled.
     // Visibility for unit testing.
     protected boolean setUsageInMonitoredEntity(ResourceGroupMonitoringClass monClass, NetworkUsage p) {
-        // ToDo: Report only if the local usage has changed more than the "tolerable" limit for this round, or if
-        // too much time has passed since the last report. Else, just return.
-
         long bytesUsed, messagesUsed;
         boolean sendReport;
         int numSuppressions = 0;
@@ -352,13 +380,15 @@ public class ResourceGroup {
             monEntity.localUsageStatsLock.unlock();
         }
 
-        // ToDo: make the following logs at debug level after initial bringup.
+        final String rgName = this.ruPublisher != null ? this.ruPublisher.getID() : this.resourceGroupName;
+        double sentCount = sendReport ? 1 : 0;
+        rgLocalUsageReportCount.labels(rgName, monClass.name()).inc(sentCount);
         if (sendReport) {
-            log.info("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
-                    this.resourceGroupName, monClass, bytesUsed, messagesUsed);
+            log.debug("fillResourceUsage for RG={}: filled a {} update; bytes={}, messages={}",
+                    rgName, monClass, bytesUsed, messagesUsed);
         } else {
-            log.info("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)",
-                    this.resourceGroupName, monClass, numSuppressions);
+            log.debug("fillResourceUsage for RG={}: report for {} suppressed (suppressions={} since last sent report)",
+                    rgName, monClass, numSuppressions);
         }
 
         return sendReport;
@@ -377,24 +407,29 @@ public class ResourceGroup {
         usageStats = monEntity.usageFromOtherBrokers.get(broker);
         if (usageStats == null) {
             usageStats = new PerBrokerUsageStats();
+            usageStats.usedValues = new BytesAndMessagesCount();
         }
         monEntity.usageFromOtherBrokersLock.lock();
         try {
-            newByteCount = usageStats.usedValues.bytes = p.getBytesPerPeriod();
-            newMessageCount = usageStats.usedValues.messages = p.getMessagesPerPeriod();
+            newByteCount = p.getBytesPerPeriod();
+            usageStats.usedValues.bytes = newByteCount;
+            newMessageCount = p.getMessagesPerPeriod();
+            usageStats.usedValues.messages = newMessageCount;
             usageStats.lastResourceUsageReadTimeMSecsSinceEpoch = System.currentTimeMillis();
             oldUsageStats = monEntity.usageFromOtherBrokers.put(broker, usageStats);
         } finally {
             monEntity.usageFromOtherBrokersLock.unlock();
         }
+        rgRemoteUsageReportsBytes.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newByteCount);
+        rgRemoteUsageReportsMessages.labels(this.ruConsumer.getID(), monClass.name(), broker).inc(newMessageCount);
+
         oldByteCount = oldMessageCount = -1;
         if (oldUsageStats != null) {
             oldByteCount = oldUsageStats.usedValues.bytes;
             oldMessageCount = oldUsageStats.usedValues.messages;
         }
 
-        // ToDo: make the following log at debug level after initial bringup.
-        log.info("resourceUsageListener for RG={}: updated {} stats for broker={} "
+        log.debug("resourceUsageListener for RG={}: updated {} stats for broker={} "
                 + "with bytes={} (old ={}), messages={} (old={})",
                 this.resourceGroupName, monClass, broker,
                 newByteCount, oldByteCount,
@@ -478,9 +513,30 @@ public class ResourceGroup {
     // The creator resource-group-service [ToDo: remove later with a strict singleton ResourceGroupService]
     ResourceGroupService rgs;
 
+    // Labels for the various counters used here.
+    private static final String[] resourceGroupMontoringclassLabels = {"ResourceGroup", "MonitoringClass"};
+    private static final String[] resourceGroupMontoringclassRemotebrokerLabels =
+                                                    {"ResourceGroup", "MonitoringClass", "RemoteBroker"};
+
+    private static final Counter rgRemoteUsageReportsBytes = Counter.build()
+            .name("pulsar_resource_group_remote_usage_bytes_used")
+            .help("Bytes used reported about this <RG, monitoring class> from a remote broker")
+            .labelNames(resourceGroupMontoringclassRemotebrokerLabels)
+            .register();
+    private static final Counter rgRemoteUsageReportsMessages = Counter.build()
+            .name("pulsar_resource_group_remote_usage_messages_used")
+            .help("Messages used reported about this <RG, monitoring class> from a remote broker")
+            .labelNames(resourceGroupMontoringclassRemotebrokerLabels)
+            .register();
+
+    private static final Counter rgLocalUsageReportCount = Counter.build()
+            .name("pulsar_resource_group_local_usage_reported")
+            .help("Number of times local usage was reported (vs. suppressed due to negligible change)")
+            .labelNames(resourceGroupMontoringclassLabels)
+            .register();
+
     protected static class PerMonitoringClassFields {
         // This lock covers all the "local" counts (i.e., except for the per-broker usage stats).
-        // ToDo: Change this one to a ReadWrite lock.
         Lock localUsageStatsLock;
 
         BytesAndMessagesCount configValuesPerPeriod;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
index c112536..058f16a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupService.java
@@ -19,6 +19,8 @@
 package org.apache.pulsar.broker.resourcegroup;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Summary;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -129,11 +131,13 @@ public class ResourceGroupService {
         } catch (NullPointerException e) {
             throw new IllegalArgumentException("ResourceGroupUpdate: Invalid null ResourceGroup config");
         }
+
         ResourceGroup rg = this.getResourceGroupInternal(rgName);
         if (rg == null) {
             throw new PulsarAdminException("Resource group does not exist: " + rgName);
         }
         rg.updateResourceGroup(rgConfig);
+        rgUpdates.labels(rgName).inc();
     }
 
     public Set<String> resourceGroupGetAll() {
@@ -197,6 +201,7 @@ public class ResourceGroupService {
 
         // Associate this tenant name with the RG.
         this.tenantToRGsMap.put(tenantName, rg);
+        rgTenantRegisters.labels(resourceGroupName).inc();
     }
 
     /**
@@ -218,6 +223,7 @@ public class ResourceGroupService {
 
         // Dissociate this tenant name from the RG.
         this.tenantToRGsMap.remove(tenantName, rg);
+        rgTenantUnRegisters.labels(resourceGroupName).inc();
     }
 
     /**
@@ -248,6 +254,7 @@ public class ResourceGroupService {
 
         // Associate this NS-name with the RG.
         this.namespaceToRGsMap.put(namespaceName, rg);
+        rgNamespaceRegisters.labels(resourceGroupName).inc();
     }
 
     /**
@@ -280,6 +287,7 @@ public class ResourceGroupService {
 
         // Dissociate this NS-name from the RG.
         this.namespaceToRGsMap.remove(namespaceName, rg);
+        rgNamespaceUnRegisters.labels(resourceGroupName).inc();
     }
 
     /**
@@ -318,16 +326,23 @@ public class ResourceGroupService {
         }
 
         if (nsRG == tenantRG) {
-            // Update only once.
+            // Update only once in this case.
+            // Note that we will update both tenant and namespace RGs in other cases.
             nsRG.incrementLocalUsageStats(monClass, incStats);
+            rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages);
+            rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes);
             return true;
         }
 
         if (tenantRG != null) {
             tenantRG.incrementLocalUsageStats(monClass, incStats);
+            rgLocalUsageMessages.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.messages);
+            rgLocalUsageBytes.labels(tenantRG.resourceGroupName, monClass.name()).inc(incStats.bytes);
         }
         if (nsRG != null) {
             nsRG.incrementLocalUsageStats(monClass, incStats);
+            rgLocalUsageMessages.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.messages);
+            rgLocalUsageBytes.labels(nsRG.resourceGroupName, monClass.name()).inc(incStats.bytes);
         }
 
         return true;
@@ -409,22 +424,76 @@ public class ResourceGroupService {
 
         try {
             boolean statsUpdated = this.incrementUsage(tenantString, nsString, monClass, bmDiff);
-            log.info("aggregateResourceGroupLocalUsages monclass={} statsUpdated={} for tenant={}, namespace={}; "
+            log.debug("updateStatsWithDiff: monclass={} statsUpdated={} for tenant={}, namespace={}; "
                             + "by {} bytes, {} mesgs",
                     monClass, statsUpdated, tenantString, nsString,
                     bmDiff.bytes, bmDiff.messages);
             hm.put(topicName, bmNewCount);
         } catch (Throwable t) {
-            log.error("aggregateResourceGroupLocalUsages got ex={} while aggregating for {}} side",
+            log.error("updateStatsWithDiff: got ex={} while aggregating for {} side",
                     t.getMessage(), monClass);
         }
     }
 
+    // Visibility for testing.
+    protected static double getRgQuotaByteCount (String rgName, String monClassName) {
+        return rgCalculatedQuotaBytes.labels(rgName, monClassName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgQuotaMessageCount (String rgName, String monClassName) {
+        return rgCalculatedQuotaMessages.labels(rgName, monClassName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgLocalUsageByteCount (String rgName, String monClassName) {
+        return rgLocalUsageBytes.labels(rgName, monClassName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgLocalUsageMessageCount (String rgName, String monClassName) {
+        return rgLocalUsageMessages.labels(rgName, monClassName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgUpdatesCount (String rgName) {
+        return rgUpdates.labels(rgName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgTenantRegistersCount (String rgName) {
+        return rgTenantRegisters.labels(rgName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgTenantUnRegistersCount (String rgName) {
+        return rgTenantUnRegisters.labels(rgName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgNamespaceRegistersCount (String rgName) {
+        return rgNamespaceRegisters.labels(rgName).get();
+    }
+
+    // Visibility for testing.
+    protected static double getRgNamespaceUnRegistersCount (String rgName) {
+        return rgNamespaceUnRegisters.labels(rgName).get();
+    }
+
+    // Visibility for testing.
+    protected static Summary.Child.Value getRgUsageAggregationLatency() {
+        return rgUsageAggregationLatency.get();
+    }
+
+    // Visibility for testing.
+    protected static Summary.Child.Value getRgQuotaCalculationTime() {
+        return rgQuotaCalculationLatency.get();
+    }
+
     // Periodically aggregate the usage from all topics known to the BrokerService.
     // Visibility for unit testing.
     protected void aggregateResourceGroupLocalUsages() {
-        long mSecsSinceEpochStart, mSecsSinceEpochEnd, diffMSecs;
-        mSecsSinceEpochStart = System.currentTimeMillis();
+        final Summary.Timer aggrUsageTimer = rgUsageAggregationLatency.startTimer();
         BrokerService bs = this.pulsar.getBrokerService();
         Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
 
@@ -440,17 +509,13 @@ public class ResourceGroupService {
                 continue;
             }
 
-            this.updateStatsWithDiff(topicName, tenantString, nsString,
-                                    topicStats.bytesInCounter, topicStats.msgInCounter,
-                                    ResourceGroupMonitoringClass.Publish);
-
-            this.updateStatsWithDiff(topicName, tenantString, nsString,
-                                    topicStats.bytesOutCounter, topicStats.msgOutCounter,
-                                    ResourceGroupMonitoringClass.Dispatch);
+            for (ResourceGroupMonitoringClass monClass : ResourceGroupMonitoringClass.values()) {
+                this.updateStatsWithDiff(topicName, tenantString, nsString,
+                        topicStats.bytesInCounter, topicStats.msgInCounter, monClass);
+            }
         }
-        mSecsSinceEpochEnd = System.currentTimeMillis();
-        diffMSecs = mSecsSinceEpochEnd - mSecsSinceEpochStart;
-        log.debug("aggregateResourceGroupLocalUsages took {} millisecs", diffMSecs);
+        double diffTimeSeconds = aggrUsageTimer.observeDuration();
+        log.debug("aggregateResourceGroupLocalUsages took {} milliseconds", diffTimeSeconds * 1000);
 
         // Check any re-scheduling requirements for next time.
         // Use the same period as getResourceUsagePublishIntervalInSecs;
@@ -481,8 +546,7 @@ public class ResourceGroupService {
     // from the reports received from other brokers.
     private void calculateQuotaForAllResourceGroups() {
         // Calculate the quota for the next window for this RG, based on the observed usage.
-        long mSecsSinceEpochStart, mSecsSinceEpochEnd, diffMSecs;
-        mSecsSinceEpochStart = System.currentTimeMillis();
+        final Summary.Timer quotaCalcTimer = rgQuotaCalculationLatency.startTimer();
         BytesAndMessagesCount updatedQuota = new BytesAndMessagesCount();
         this.resourceGroupsMap.forEach((rgName, resourceGroup) -> {
             BytesAndMessagesCount globalUsageStats;
@@ -506,16 +570,21 @@ public class ResourceGroupService {
                             localUsageStats.messages,
                             globUsageMessagesArray);
 
-                    resourceGroup.updateLocalQuota(monClass, updatedQuota);
+                    BytesAndMessagesCount oldBMCount = resourceGroup.updateLocalQuota(monClass, updatedQuota);
+                    rgCalculatedQuotaMessages.labels(rgName, monClass.name()).inc(updatedQuota.messages);
+                    rgCalculatedQuotaBytes.labels(rgName, monClass.name()).inc(updatedQuota.bytes);
+                    long messagesIncrement = updatedQuota.messages - oldBMCount.messages;
+                    long bytesIncrement = updatedQuota.bytes - oldBMCount.bytes;
+                    log.debug("calculateQuota for RG {} [class {}]: bytes incremented by {}, messages by {}",
+                            rgName, monClass, messagesIncrement, bytesIncrement);
                 } catch (Throwable t) {
                     log.error("Got exception={} while calculating new quota for monitoring-class={} of RG={}",
                             t.getMessage(), monClass, rgName);
                 }
             }
         });
-        mSecsSinceEpochEnd = System.currentTimeMillis();
-        diffMSecs = mSecsSinceEpochEnd - mSecsSinceEpochStart;
-        log.debug("calculateQuotaForAllResourceGroups took {} millisecs", diffMSecs);
+        double diffTimeSeconds = quotaCalcTimer.observeDuration();
+        log.debug("calculateQuotaForAllResourceGroups took {} milliseconds", diffTimeSeconds * 1000);
 
         // Check any re-scheduling requirements for next time.
         // Use the same period as getResourceUsagePublishIntervalInSecs;
@@ -529,7 +598,8 @@ public class ResourceGroupService {
                         this.resourceUsagePublishPeriodInSeconds, newPeriodInSeconds, timeUnitScale);
             } else {
                 boolean cancelStatus = this.calculateQuotaPeriodicTask.cancel(true);
-                log.info("Got status={} in cancel of periodic when publish period changed from {} to {} {}",
+                log.info("calculateQuotaForAllResourceGroups: Got status={} in cancel of periodic "
+                        + " when publish period changed from {} to {} {}",
                         cancelStatus, this.resourceUsagePublishPeriodInSeconds, newPeriodInSeconds, timeUnitScale);
             }
             this.calculateQuotaPeriodicTask = pulsar.getExecutor().scheduleAtFixedRate(
@@ -626,4 +696,72 @@ public class ResourceGroupService {
     // Setting this to 0 will also make us report in every round.
     // Don't set it to negative values; behavior will be "undefined".
     protected static final float UsageReportSuppressionTolerancePercentage = 5;
+
+    // Labels for the various counters used here.
+    private static final String[] resourceGroupLabel = {"ResourceGroup"};
+    private static final String[] resourceGroupMonitoringclassLabels = {"ResourceGroup", "MonitoringClass"};
+
+    private static final Counter rgCalculatedQuotaBytes = Counter.build()
+            .name("pulsar_resource_group_calculated_bytes_quota")
+            .help("Bytes quota calculated for resource group")
+            .labelNames(resourceGroupMonitoringclassLabels)
+            .register();
+    private static final Counter rgCalculatedQuotaMessages = Counter.build()
+            .name("pulsar_resource_group_calculated_messages_quota")
+            .help("Messages quota calculated for resource group")
+            .labelNames(resourceGroupMonitoringclassLabels)
+            .register();
+
+    private static final Counter rgLocalUsageBytes = Counter.build()
+            .name("pulsar_resource_group_bytes_used")
+            .help("Bytes locally used within this resource group during the last aggregation interval")
+            .labelNames(resourceGroupMonitoringclassLabels)
+            .register();
+    private static final Counter rgLocalUsageMessages = Counter.build()
+            .name("pulsar_resource_group_messages_used")
+            .help("Messages locally used within this resource group during the last aggregation interval")
+            .labelNames(resourceGroupMonitoringclassLabels)
+            .register();
+
+    private static final Counter rgUpdates = Counter.build()
+            .name("pulsar_resource_group_updates")
+            .help("Number of update operations on the given resource group")
+            .labelNames(resourceGroupLabel)
+            .register();
+
+    private static final Counter rgTenantRegisters = Counter.build()
+            .name("pulsar_resource_group_tenant_registers")
+            .help("Number of registrations of tenants")
+            .labelNames(resourceGroupLabel)
+            .register();
+    private static final Counter rgTenantUnRegisters = Counter.build()
+            .name("pulsar_resource_group_tenant_unregisters")
+            .help("Number of un-registrations of tenants")
+            .labelNames(resourceGroupLabel)
+            .register();
+
+    private static final Counter rgNamespaceRegisters = Counter.build()
+            .name("pulsar_resource_group_namespace_registers")
+            .help("Number of registrations of namespaces")
+            .labelNames(resourceGroupLabel)
+            .register();
+    private static final Counter rgNamespaceUnRegisters = Counter.build()
+            .name("pulsar_resource_group_namespace_unregisters")
+            .help("Number of un-registrations of namespaces")
+            .labelNames(resourceGroupLabel)
+            .register();
+
+    private static final Summary rgUsageAggregationLatency = Summary.build()
+            .quantile(0.5, 0.05)
+            .quantile(0.9, 0.01)
+            .name("pulsar_resource_group_aggregate_usage_secs")
+            .help("Time required to aggregate usage of all resource groups, in seconds.")
+            .register();
+
+    private static final Summary rgQuotaCalculationLatency = Summary.build()
+            .quantile(0.5, 0.05)
+            .quantile(0.9, 0.01)
+            .name("pulsar_resource_group_calculate_quota_secs")
+            .help("Time required to calculate quota of all resource groups, in seconds.")
+            .register();
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java
new file mode 100644
index 0000000..833b82c
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/RGUsageMTAggrWaitForAllMesgs.java
@@ -0,0 +1,978 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.resourcegroup;
+
+import com.google.common.collect.Sets;
+import io.prometheus.client.Summary;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroup.BytesAndMessagesCount;
+import org.apache.pulsar.broker.resourcegroup.ResourceGroup.ResourceGroupMonitoringClass;
+import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.ClusterDataImpl;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+// The tests implement a set of producer/consumer operations on a set of topics.
+// [A thread is started for each producer, and each consumer in the test.]
+// The tenants and namespaces in those topics are associated with a set of resource-groups (RGs).
+// After sending/receiving all the messages, traffic usage statistics, and Prometheus-metrics
+// are verified on the RGs.
+public class RGUsageMTAggrWaitForAllMesgs extends ProducerConsumerBase {
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        this.prepareForOps();
+
+        ResourceQuotaCalculator dummyQuotaCalc = new ResourceQuotaCalculator() {
+            @Override
+            public boolean needToReportLocalUsage(long currentBytesUsed, long lastReportedBytes,
+                                                  long currentMessagesUsed, long lastReportedMessages,
+                                                  long lastReportTimeMSecsSinceEpoch) {
+                // Pretend to report every time, just to see the RG-metrics increasing.
+                numLocalUsageReports++;
+                return true;
+            }
+
+            @Override
+            public long computeLocalQuota(long confUsage, long myUsage, long[] allUsages) {
+                return 0;
+            }
+        };
+
+        ResourceUsageTopicTransportManager transportMgr = new ResourceUsageTopicTransportManager(pulsar);
+        this.rgservice = new ResourceGroupService(pulsar, TimeUnit.SECONDS, transportMgr, dummyQuotaCalc);
+
+        this.prepareRGs();
+    }
+
+    @AfterClass(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testMTProduceConsumeRGUsage() throws Exception {
+        int topicSet= 0;
+        for (String[] topicStrings : AllTopicNames) {
+            // AllTopicNames has the topic strings in this order:
+            //      PersistentTopicNamesSameTenanatAndNsRGs[],
+            //      PersistentTopicNamesDifferentTenantAndNsRGs[],
+            //      NonPersistentTopicNamesSameTenantAndNsRGs[],
+            //      NonPersistentTopicNamesDifferentTenantAndNsRGs[].
+            // If the number of RGs is == 1, the "same tenant/NS" topics names and the
+            // "different tenant/NS" topic names coincide; so, there's no need to run them distinctly.
+            if (NumRGs == 1 && (topicSet % 2 == 1)) {
+                log.info("NumRGs={}; skipping repetition of test for topic-set {}", NumRGs, topicSet);
+            } else {
+                testProduceConsumeUsageOnRG(topicStrings);
+                log.info("Done with topic-set {}", topicSet);
+            }
+            topicSet++;
+        }
+        log.info("Done testing with all topics");
+    }
+
+    // A class which implements the producer; the main test thread will spawn multiple producers.
+    private class produceMessages implements Runnable {
+        private int producerId;
+        private int numMesgsToProduce;
+        private String[] topicStrings;
+        private String myProduceTopic;
+
+        private int sentNumBytes = 0;
+        private int sentNumMsgs = 0;
+        private int numExceptions = 0;
+
+        produceMessages(int prodId, int nMesgs, String[] topics) {
+            producerId = prodId;
+            numMesgsToProduce = nMesgs;
+            topicStrings = topics;
+            myProduceTopic = topicStrings[producerId % NumTopics];
+        }
+
+        public int getNumBytesSent() {
+            return sentNumBytes;
+        }
+
+        public int getNumMessagesSent() {
+            return sentNumMsgs;
+        }
+
+        public int getNumExceptions() {
+            return numExceptions;
+        }
+
+        @Override
+        public void run() {
+            Producer<byte[]> producer = null;
+
+            try {
+                // The producer will send messages to a specific topic, since it doesn't make sense for a producer
+                // to produce a message with vagueness about the destination topic (neither do Pulsar APIs allow it).
+                producer = pulsarClient.newProducer()
+                        .topic(myProduceTopic)
+                        .create();
+            } catch (PulsarClientException p) {
+                numExceptions++;
+                log.info("Producer={} got exception while building producer: ex={}",
+                        producerId, p.getMessage());
+            }
+
+            for (int ix = 0; ix < numMesgsToProduce; ix++) {
+                byte[] mesg;
+                try {
+                    mesg = String.format("ProducerId=%d, ix=%d, topic=%s", producerId, ix, myProduceTopic).getBytes();
+                    MessageId msgId = producer.send(mesg);
+                    sentNumBytes += mesg.length;
+                    sentNumMsgs++;
+                    log.debug("Producer={}, sent msg-ix={}, msgId={}", producerId, ix, msgId);
+                } catch (PulsarClientException p) {
+                    numExceptions++;
+                    log.info("Producer={} got exception while sending {}-th time: ex={}",
+                            producerId, ix, p.getMessage());
+                }
+            }
+            try {
+                producer.flush();
+                producer.close();
+            } catch (PulsarClientException p) {
+                numExceptions++;
+                log.info("Producer={} got exception while closing producer: ex={}",
+                        producerId, p.getMessage());
+            }
+
+            log.debug("Producer={} done with topic={}; got {} exceptions", producerId, myProduceTopic, numExceptions);
+        }
+    }
+
+    // Track the producer object, and the thread using it.
+    class producerWithThread {
+        produceMessages producer;
+        Thread thread;
+    }
+
+    // A class which implements the consumer; the main test thread will spawn multiple consumers.
+    private class consumeMessages implements Runnable {
+        private int consumerId;
+        private int numMesgsForThisConsumer;
+        private int numTotalMesgsToConsume;
+        private SubscriptionType subscriptionType;
+
+        private String[] topicStrings;
+        private Consumer<byte[]> consumer = null;
+
+        private int recvTimeoutMilliSecs = 1000;
+        private int ackTimeoutMilliSecs = 1100; // has to be more than 1 second
+        private int recvdNumBytes = 0;
+        private int recvdNumMsgs = 0;
+        private int numExceptions = 0;
+        private volatile boolean allMessagesReceived = false;
+        private boolean consumerIsReady = false;
+
+        consumeMessages(int consId, int nMesgs, int totalMesgs, SubscriptionType subType, String[] topics) {
+            consumerId = consId;
+            numMesgsForThisConsumer = nMesgs;
+            numTotalMesgsToConsume = totalMesgs;
+            subscriptionType = subType;
+            topicStrings = topics;
+        }
+
+        public boolean isConsumerReady() {
+            return consumerIsReady;
+        }
+
+        public int getNumBytesRecvd() {
+            return recvdNumBytes;
+        }
+
+        public int getNumMessagesRecvd() {
+            return recvdNumMsgs;
+        }
+
+        public int getNumExceptions() {
+            return numExceptions;
+        }
+
+        public void setAllMessagesReceived() { allMessagesReceived = true; }
+
+        public void closeConsumer() {
+            try {
+                consumer.close();
+            } catch (PulsarClientException p) {
+                numExceptions++;
+                log.error("Consumer={} got exception while closing consumer: ex={}",
+                        consumerId, p.getMessage());
+            }
+        }
+
+        @Override
+        public void run() {
+            // Create a consumer and subscription, and space for messages, so that they are held for consumption.
+            int recvQueueSize = 0;
+            String subscriptionString = null;
+            switch(subscriptionType) {
+                default:
+                    numExceptions++;
+                    final String errMesg = String.format("Consumer=%d got unexpected subscription type=%s",
+                            consumerId, subscriptionType);
+                    Assert.assertTrue(false, errMesg);
+                    break;
+                case Shared:
+                    recvQueueSize = numTotalMesgsToConsume;
+                    subscriptionString = "my-subscription";
+                    break;
+                case Exclusive:
+                    recvQueueSize = numMesgsForThisConsumer;
+                    subscriptionString = "my-subscription-" + consumerId;
+                    break;
+            }
+
+            try {
+                // The consumer will try to get a message from any of the topics, since Pulsar allows a consumer to
+                // be subscribed to multiple topics.
+                consumer = pulsarClient.newConsumer()
+                        .topic(topicStrings)
+                        .subscriptionName(subscriptionString)
+                        .subscriptionType(subscriptionType)
+                        .receiverQueueSize(recvQueueSize)
+                        .ackTimeout(ackTimeoutMilliSecs, TimeUnit.MILLISECONDS)
+                        .subscribe();
+            } catch (PulsarClientException p) {
+                numExceptions++;
+                log.error("Consumer={} got exception while building consumer: ex={}",
+                        consumerId, p.getMessage());
+            }
+
+            Message<byte[]> message = null;
+            consumerIsReady = true;
+            while (!allMessagesReceived) {
+                log.debug("Consumer={} waiting for mesgnum={}", consumerId, recvdNumMsgs);
+                try {
+                    message = consumer.receive(recvTimeoutMilliSecs, TimeUnit.MILLISECONDS);
+                    if (message != null) {
+                        consumer.acknowledgeAsync(message);
+                        String mesg = String.format("Consumer=%d recvd %d-th mesg; id=%s, data=%s",
+                                consumerId, recvdNumMsgs, message.getMessageId(), new String(message.getData()));
+                        log.debug(mesg);
+                        recvdNumBytes += message.getValue().length;
+                        recvdNumMsgs++;
+                    }
+                } catch (PulsarClientException p) {
+                    numExceptions++;
+                    log.error("Consumer={} got exception in while receiving {}-th mesg at consumer: ex={}",
+                            consumerId, recvdNumMsgs, p.getMessage());
+                }
+            }
+
+            log.debug("Consumer={} done; got {} exceptions", consumerId, numExceptions);
+        }
+    }
+
+    // Track the consumer object, and the thread using it.
+    class consumerWithThread {
+        consumeMessages consumer;
+        Thread thread;
+    }
+
+    // Given a topic, get the tenant RG-name
+    private String TopicToTenantRGName (TopicName topicName) {
+        // Under the current topic naming scheme, the tenant-rg name is just the tenant part of the topic.
+        String tenant = topicName.getTenant();
+        return tenant;
+    }
+
+    // Given a topic, get the namespace RG-name
+    private String TopicToNamespaceRGName (TopicName topicName) {
+        // Under the current topic naming  scheme, the namespace-rg name is just the namespace part of the topic.
+        String nameSpace = topicName.getNamespacePortion();
+        return nameSpace;
+    }
+
+    // Return true if the tenant-RG == namespace-RG in the topics given, false otherwise.
+    // If some are equal, and others unequal, throw, becasue thisis unexpected in this UT at the moment.
+    private boolean tenantRGEqualsNamespaceRG(String[] topicStrings) throws PulsarClientException {
+        int numEqualRGs = 0;
+        int numUnEqualRGs = 0;
+        int numTopics = topicStrings.length;
+        for (String topicStr : topicStrings) {
+            TopicName topic = TopicName.get(topicStr);
+            String tenantRG = TopicToTenantRGName(topic);
+            String namespaceRG = TopicToNamespaceRGName(topic);
+            if (tenantRG.compareTo(namespaceRG) == 0) {
+                numEqualRGs++;
+            } else {
+                numUnEqualRGs++;
+            }
+        }
+        if ((numEqualRGs + numUnEqualRGs != numTopics) || (numEqualRGs > 0 && numUnEqualRGs > 0)) {
+            String errMesg = String.format("Found {} topics with equal RGs and {} with unequal, on {} topics",
+                    numEqualRGs, numUnEqualRGs, numTopics);
+            throw new PulsarClientException(errMesg);
+        } else if (numEqualRGs == numTopics) {
+            return true;
+        }
+        return false;
+    }
+
+    private void registerTenantsAndNamespaces(String[] topicStrings) throws Exception {
+        for (String topicStr : topicStrings) {
+            TopicName topic = TopicName.get(topicStr);
+            String tenantRG = TopicToTenantRGName(topic);
+            String namespaceRG = TopicToNamespaceRGName(topic);
+
+            // The tenant name and namespace name parts of the topic are the same as their corresponding RG-names.
+            // Hence, the arguments to register look a little odd.
+            if (!registeredTenants.contains(tenantRG)) {
+                this.rgservice.registerTenant(tenantRG, tenantRG);
+                registeredTenants.add(tenantRG);
+            }
+            if (!registeredNamespaces.contains(namespaceRG)) {
+                this.rgservice.registerNameSpace(namespaceRG, namespaceRG);
+                registeredNamespaces.add(namespaceRG);
+            }
+        }
+    }
+
+    private void unRegisterTenantsAndNamespaces(String[] topicStrings) throws Exception {
+        for (String topicStr : topicStrings) {
+            TopicName topic = TopicName.get(topicStr);
+            String tenantRG = TopicToTenantRGName(topic);
+            String namespaceRG = TopicToNamespaceRGName(topic);
+
+            // The tenant name and namespace name parts of the topic are the same as their corresponding RG-names.
+            // Hence, the arguments to unRegister look a little odd.
+            if (registeredTenants.contains(tenantRG)) {
+                this.rgservice.unRegisterTenant(tenantRG, tenantRG);
+                registeredTenants.remove(tenantRG);
+            }
+            if (registeredNamespaces.contains(namespaceRG)) {
+                this.rgservice.unRegisterNameSpace(namespaceRG, namespaceRG);
+                registeredNamespaces.remove(namespaceRG);
+            }
+        }
+    }
+
+    // Produce/consume messages on the given topics, and verify that the resource-group stats are updated.
+    private void testProduceConsumeUsageOnRG(String[] topicStrings) throws Exception {
+        createRGs();
+        createTopics(topicStrings);
+        registerTenantsAndNamespaces(topicStrings);
+
+        final int TotalExpectedMessagesToSend = NumTotalMessages;
+        final int TotalExpectedMessagesToReceive = TotalExpectedMessagesToSend;
+
+        final SubscriptionType consumeSubscriptionType = SubscriptionType.Shared;  // Shared, or Exclusive
+
+        producerWithThread prodThr[] = new producerWithThread[NumProducers];
+        consumerWithThread consThr[] = new consumerWithThread[NumConsumers];
+        int sentNumBytes = 0;
+        int sentNumMsgs = 0;
+        int numProducerExceptions = 0;
+
+//        // Verify that stats on the topic are all-zero in the beginning.
+//        this.verfyRGProdConsStats(topicStrings, 0, 0, 0, 0,true, true);
+
+        // Fork some consumers to receive the messages.
+        for (int ix = 0; ix < NumConsumers; ix++) {
+            consThr[ix] = new consumerWithThread();
+            consumeMessages cm = new consumeMessages(ix, NumMessagesPerConsumer, TotalExpectedMessagesToReceive,
+                                                     consumeSubscriptionType, topicStrings);
+            Thread thr = new Thread(cm);
+            thr.start();
+            consThr[ix].consumer = cm;
+            consThr[ix].thread = thr;
+        }
+
+        // Wait for all consumers to be ready, before forking producers, so we don't lose messages.
+        int numReadyConsumers;
+        do {
+            Thread.sleep(500);
+            numReadyConsumers = 0;
+            for (int ix = 0; ix < NumConsumers; ix++) {
+                if (consThr[ix].consumer.isConsumerReady()) {
+                    numReadyConsumers++;
+                }
+            }
+            log.debug("{} consumers are not yet ready", NumConsumers - numReadyConsumers);
+        } while (numReadyConsumers < NumConsumers);
+
+        // Fork some producers to send the messages.
+        for (int ix = 0; ix < NumProducers; ix++) {
+            prodThr[ix] = new producerWithThread();
+            produceMessages pm = new produceMessages(ix, NumMessagesPerProducer, topicStrings);
+            Thread thr = new Thread(pm);
+            thr.start();
+            prodThr[ix].producer = pm;
+            prodThr[ix].thread = thr;
+        }
+
+        // Wait for the producers to complete.
+        int sentMsgs, sentBytes;
+        for (int ix = 0; ix < NumProducers; ix++) {
+            prodThr[ix].thread.join();
+            sentBytes = prodThr[ix].producer.getNumBytesSent();
+            sentMsgs = prodThr[ix].producer.getNumMessagesSent();
+            numProducerExceptions += prodThr[ix].producer.getNumExceptions();
+
+            log.debug("Producer={} sent {} mesgs and {} bytes", ix, sentMsgs, sentBytes);
+            sentNumBytes += sentBytes;
+            sentNumMsgs += sentMsgs;
+        }
+        Assert.assertEquals(sentNumMsgs, TotalExpectedMessagesToSend);
+        Assert.assertEquals(numProducerExceptions, 0);
+
+        int recvdNumBytes = 0;
+        int recvdNumMsgs;
+        int numConsumerExceptions = 0;
+
+        // Wait for the consumers to receive all the messages.
+        do {
+            Thread.sleep(2000);
+            recvdNumMsgs = 0;
+            int consNumMesgsRecvd;
+            for (int ix = 0; ix < NumConsumers; ix++) {
+                consNumMesgsRecvd = consThr[ix].consumer.getNumMessagesRecvd();
+                recvdNumMsgs += consNumMesgsRecvd;
+                log.debug("consumer={} received {} messages (current total {}, expected {})",
+                        ix, consNumMesgsRecvd, recvdNumMsgs, TotalExpectedMessagesToReceive);
+            }
+        } while (recvdNumMsgs < TotalExpectedMessagesToReceive);
+
+        // Tell the consumers that all expected messages have been received (but don't close them yet).
+        for (int ix = 0; ix < NumConsumers; ix++) {
+            consThr[ix].consumer.setAllMessagesReceived();
+            log.debug("consumer={} told to stop", ix);
+        }
+
+        boolean[] joinedConsumers = new boolean[NumConsumers];
+        for (boolean b : joinedConsumers) {
+            b = false;
+        }
+
+        recvdNumMsgs = recvdNumBytes = 0;
+        int numConsumersDone = 0;
+        int recvdMsgs, recvdBytes;
+        while (numConsumersDone < NumConsumers) {
+            for (int ix = 0; ix < NumConsumers; ix++) {
+                if (joinedConsumers[ix] == false)  {
+                    recvdBytes = consThr[ix].consumer.getNumBytesRecvd();
+                    recvdMsgs = consThr[ix].consumer.getNumMessagesRecvd();
+                    numConsumerExceptions += consThr[ix].consumer.getNumExceptions();
+                    log.debug("Consumer={} received {} mesgs and {} bytes", ix, recvdMsgs, recvdBytes);
+
+                    consThr[ix].thread.join();
+                    joinedConsumers[ix] = true;
+                    log.debug("Joined consumer={}", ix);
+
+                    recvdNumBytes += recvdBytes;
+                    recvdNumMsgs += recvdMsgs;
+                    numConsumersDone++;
+                }
+            }
+        }
+
+        // Close the consumers.
+        for (int ix = 0; ix < NumConsumers; ix++) {
+            consThr[ix].consumer.closeConsumer();
+        }
+
+        Assert.assertEquals(recvdNumMsgs, TotalExpectedMessagesToReceive);
+        Assert.assertEquals(numConsumerExceptions, 0);
+
+//        // Verify the producer side stats (msgInCounter/bytesInCounter) only.
+//        this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs, 0, 0,true, false);
+
+        // Verify producer and consumer side stats.
+        this.verfyRGProdConsStats(topicStrings, sentNumBytes, sentNumMsgs,
+                recvdNumBytes, recvdNumMsgs, true, true);
+
+        // Verify the metrics corresponding to the operations in this test.
+        this.verifyRGMetrics(topicStrings, sentNumBytes, sentNumMsgs, recvdNumBytes, recvdNumMsgs);
+
+        unRegisterTenantsAndNamespaces(topicStrings);
+        destroyTopics(topicStrings);
+        destroyRGs();
+    }
+
+    // Verify the app stats with what we see from the broker-service, and the resource-group (which in turn internally
+    // derives stats from the broker service)
+    private void verfyRGProdConsStats(String[] topicStrings,
+                            int sentNumBytes, int sentNumMsgs,
+                            int recvdNumBytes, int recvdNumMsgs,
+                            boolean checkProduce, boolean checkConsume)  throws Exception {
+
+        boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
+        BrokerService bs = pulsar.getBrokerService();
+        Map<String, TopicStatsImpl> topicStatsMap = bs.getTopicStats();
+
+        log.info("verfyProdConsStats: topicStatsMap has {} entries", topicStatsMap.size());
+
+        // Pulsar runtime adds some additional bytes in the exchanges: a 45-byte per-message
+        // metadata of some kind, plus more as the number of messages increases.
+        // Hence the ">=" assertion with ExpectedNumBytesSent/Received in the following checks.
+        final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
+        final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
+
+        long totalOutMessages = 0, totalOutBytes = 0;
+        long totalInMessages = 0, totalInBytes = 0;
+        BytesAndMessagesCount totalTenantRGProdCounts = new BytesAndMessagesCount();
+        BytesAndMessagesCount totalTenantRGConsCounts = new BytesAndMessagesCount();
+        BytesAndMessagesCount totalNsRGProdCounts = new BytesAndMessagesCount();
+        BytesAndMessagesCount totalNsRGConsCounts = new BytesAndMessagesCount();
+        BytesAndMessagesCount prodCounts, consCounts;
+
+        // If the tenant and NS are on different RGs, the bytes/messages get counted once on the
+        // tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
+        // This is a known (and discussed) artifact in the implementation.
+        // 'ScaleFactor' is a way to incorporate that effect in the verification.
+        final int ScaleFactor = tenantRGEqualsNsRG ? 1 : 2;
+
+        // Following sleep is to get the broker-service to gather stats on the topics.
+        // [There appears to be some asynchrony there.]
+        // Thread.sleep(5 * 1000);
+
+        // Since the following walk is on topics, keep track of the RGs for which we have already gathered stats,
+        // so that we do not double-accumulate stats if multiple topics refer to the same RG.
+        HashSet<String> RGsWithPublisStatsGathered = new HashSet<>();
+        HashSet<String> RGsWithDispatchStatsGathered = new HashSet<>();
+
+        for (Map.Entry<String, TopicStatsImpl> entry : topicStatsMap.entrySet()) {
+            String mapTopicName = entry.getKey();
+            if (Arrays.asList(topicStrings).contains(mapTopicName)) {
+                TopicStats stats = entry.getValue();
+                totalInMessages += stats.getMsgInCounter();
+                totalInBytes += stats.getBytesInCounter();
+                totalOutMessages += stats.getMsgOutCounter();
+                totalOutBytes += stats.getBytesOutCounter();
+
+                // Assuming that broker-service stats-gathering is doing its job,
+                // we should see some produced mesgs on every topic.
+                if (totalInMessages == 0) {
+                    log.warn("verfyProdConsStats: found no produced mesgs (msgInCounter) on topic {}", mapTopicName);
+                }
+
+                if (sentNumMsgs > 0 || recvdNumMsgs > 0) {
+                    TopicName topic = TopicName.get(mapTopicName);
+
+                    // Hack to ensure aggregator calculation without waiting for a period of aggregation.
+                    // [aggregateResourceGroupLocalUsages() is idempotent when there's no fresh traffic flowing.]
+                    this.rgservice.aggregateResourceGroupLocalUsages();
+
+                    final String tenantRGName = TopicToTenantRGName(topic);
+                    if (!RGsWithPublisStatsGathered.contains(tenantRGName)) {
+                        prodCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Publish);
+                        totalTenantRGProdCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, prodCounts);
+                        RGsWithPublisStatsGathered.add(tenantRGName);
+                    }
+                    if (!RGsWithDispatchStatsGathered.contains(tenantRGName)) {
+                        consCounts = this.rgservice.getRGUsage(tenantRGName, ResourceGroupMonitoringClass.Dispatch);
+                        totalTenantRGConsCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, consCounts);
+                        RGsWithDispatchStatsGathered.add(tenantRGName);
+                    }
+
+                    final String nsRGName = TopicToNamespaceRGName(topic);
+                    // If tenantRGName == nsRGName, the RG-infra will avoid double counting.
+                    // We will do the same here, to get the expected stats.
+                    if (tenantRGName.compareTo(nsRGName) != 0) {
+                        if (!RGsWithPublisStatsGathered.contains(nsRGName)) {
+                            prodCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Publish);
+                            totalNsRGProdCounts = ResourceGroup.accumulateBMCount(totalNsRGProdCounts, prodCounts);
+                            RGsWithPublisStatsGathered.add(nsRGName);
+                        }
+                        if (!RGsWithDispatchStatsGathered.contains(nsRGName)) {
+                            consCounts = this.rgservice.getRGUsage(nsRGName, ResourceGroupMonitoringClass.Dispatch);
+                            totalNsRGConsCounts = ResourceGroup.accumulateBMCount(totalNsRGConsCounts, consCounts);
+                            RGsWithDispatchStatsGathered.add(nsRGName);
+                        }
+                    }
+                }
+            }
+        }
+
+        // Check that the accumulated totals tally up.
+        if (checkConsume && checkProduce) {
+            Assert.assertEquals(totalOutMessages, totalInMessages);
+            Assert.assertEquals(totalOutBytes, totalInBytes);
+        }
+
+        if (checkProduce) {
+            Assert.assertEquals(totalInMessages, sentNumMsgs);
+            Assert.assertTrue(totalInBytes >= ExpectedNumBytesSent);
+        }
+
+        if (checkConsume) {
+            Assert.assertEquals(totalOutMessages, recvdNumMsgs);
+            Assert.assertTrue(totalOutBytes >= ExpectedNumBytesReceived);
+        }
+
+        if (checkProduce) {
+            prodCounts = ResourceGroup.accumulateBMCount(totalTenantRGProdCounts, totalNsRGProdCounts);
+            Assert.assertEquals(prodCounts.messages, sentNumMsgs * ScaleFactor);
+            Assert.assertTrue(prodCounts.bytes >= ExpectedNumBytesSent * ScaleFactor);
+        }
+
+        if (checkConsume) {
+            consCounts = ResourceGroup.accumulateBMCount(totalTenantRGConsCounts, totalNsRGConsCounts);
+            Assert.assertEquals(consCounts.messages, recvdNumMsgs * ScaleFactor);
+            Assert.assertTrue(consCounts.bytes >= ExpectedNumBytesReceived * ScaleFactor);
+        }
+    }
+
+    // Check the metrics for the RGs involved
+    private void verifyRGMetrics(String[] topicStrings,
+                                 int sentNumBytes, int sentNumMsgs,
+                                 int recvdNumBytes, int recvdNumMsgs) throws Exception {
+
+        boolean tenantRGEqualsNsRG = tenantRGEqualsNamespaceRG(topicStrings);
+        final int ExpectedNumBytesSent = sentNumBytes + PER_MESSAGE_METADATA_OHEAD * sentNumMsgs;
+        final int ExpectedNumBytesReceived = recvdNumBytes + PER_MESSAGE_METADATA_OHEAD * recvdNumMsgs;
+        long totalTenantRegisters = 0;
+        long totalTenantUnRegisters = 0;
+        long totalNamespaceRegisters = 0;
+        long totalNamespaceUnRegisters = 0;
+        long[] totalQuotaBytes = new long[ResourceGroupMonitoringClass.values().length];
+        long[] totalQuotaMessages = new long[ResourceGroupMonitoringClass.values().length];
+        long[] totalUsedBytes = new long[ResourceGroupMonitoringClass.values().length];
+        long[] totalUsedMessages = new long[ResourceGroupMonitoringClass.values().length];
+        long[] totalUsageReportCounts = new long[ResourceGroupMonitoringClass.values().length];
+        long totalUpdates = 0;
+
+        // If the tenant and NS are on different RGs, the bytes/messages get counted once on the
+        // tenant RG, and again on the namespace RG. This double-counting is avoided if tenant-RG == NS-RG.
+        // This is a known (and discussed) artifact in the implementation.
+        // 'ScaleFactor' is a way to incorporate that effect in the verification.
+        final int ScaleFactor = tenantRGEqualsNsRG ? 1 : 2;
+
+        ResourceGroupService rgs = this.rgservice;
+        for (String rgName : RGNames) {
+            for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) {
+                String mcName = mc.name();
+                int mcIndex = mc.ordinal();
+                double quotaBytes = rgs.getRgQuotaByteCount(rgName, mcName);
+                totalQuotaBytes[mcIndex] += quotaBytes;
+                double quotaMesgs = rgs.getRgQuotaMessageCount(rgName, mcName);
+                totalQuotaMessages[mcIndex] += quotaMesgs;
+                double usedBytes = rgs.getRgLocalUsageByteCount(rgName, mcName);
+                totalUsedBytes[mcIndex] += usedBytes;
+                double usedMesgs = rgs.getRgLocalUsageMessageCount(rgName, mcName);
+                totalUsedMessages[mcIndex] += usedMesgs;
+
+                double usageReportedCount = ResourceGroup.getRgUsageReportedCount(rgName, mcName);
+                totalUsageReportCounts[mcIndex] += usageReportedCount;
+            }
+
+            totalTenantRegisters += rgs.getRgTenantRegistersCount(rgName);
+            totalTenantUnRegisters += rgs.getRgTenantUnRegistersCount(rgName);
+            totalNamespaceRegisters += rgs.getRgNamespaceRegistersCount(rgName);
+            totalNamespaceUnRegisters += rgs.getRgNamespaceUnRegistersCount(rgName);;
+            totalUpdates += rgs.getRgUpdatesCount(rgName);
+        }
+        log.info("totalTenantRegisters={}, totalTenantUnRegisters={}, " +
+                        "totalNamespaceRegisters={}, totalNamespaceUnRegisters={}",
+                totalTenantRegisters, totalTenantUnRegisters, totalNamespaceRegisters, totalNamespaceUnRegisters);
+
+        // On each run, there will be 'NumRGs' registrations
+        Assert.assertEquals(totalTenantRegisters - residualTenantRegs, NumRGs);
+        Assert.assertEquals(totalNamespaceRegisters - residualNamespaceRegs, NumRGs);
+
+        // The unregisters will lag the registers by one round (because verifyRGMetrics() is called
+        // prior to unregister). In other words, their numbers will equal the residuals for the registers.
+        Assert.assertEquals(totalTenantUnRegisters, residualTenantRegs);
+        Assert.assertEquals(totalNamespaceUnRegisters, residualNamespaceRegs);
+
+        // Update residuals for next test run.
+        residualTenantRegs = totalTenantRegisters;
+        residualNamespaceRegs = totalNamespaceRegisters;
+
+        for (ResourceGroupMonitoringClass mc : ResourceGroupMonitoringClass.values()) {
+            int mcIdx = mc.ordinal();
+            log.info("mc={}: totalQuotaBytes={}, totalQuotaMessages={}, " +
+                            " totalUsedBytes={}, totalUsedMessages={}" +
+                            " totalUsageReports={}",
+                    mc.name(), totalQuotaBytes[mcIdx], totalQuotaMessages[mcIdx],
+                    totalUsedBytes[mcIdx], totalUsedMessages[mcIdx], totalUsageReportCounts[mcIdx]);
+            // On each run, the bytes/messages are monotone incremented in Prometheus metrics.
+            // So, we take the residuals into account when comparing against the expected.
+            if (mc == ResourceGroupMonitoringClass.Publish) {
+                Assert.assertEquals(totalUsedMessages[mcIdx] - residualSentNumMessages,
+                                                                                sentNumMsgs * ScaleFactor);
+                Assert.assertTrue(totalUsedBytes[mcIdx] - residualSentNumBytes
+                                                                                >= ExpectedNumBytesSent * ScaleFactor);
+            } else if (mc == ResourceGroupMonitoringClass.Dispatch) {
+                Assert.assertEquals(totalUsedMessages[mcIdx] - residualRecvdNumMessages,
+                                                                                recvdNumMsgs * ScaleFactor);
+                Assert.assertTrue(totalUsedBytes[mcIdx] - residualRecvdNumBytes
+                                                                            >= ExpectedNumBytesReceived * ScaleFactor);
+            }
+
+            long perClassUsageReports = numLocalUsageReports / ResourceGroupMonitoringClass.values().length;
+            Assert.assertEquals(totalUsageReportCounts[mcIdx], perClassUsageReports);
+        }
+
+        // Update the residuals for next round of tests.
+        residualSentNumBytes += sentNumBytes * ScaleFactor;
+        residualSentNumMessages += sentNumMsgs * ScaleFactor;
+        residualRecvdNumBytes += recvdNumBytes * ScaleFactor;
+        residualRecvdNumMessages += recvdNumMsgs * ScaleFactor;
+
+        Assert.assertEquals(totalUpdates, 0);  // currently, we don't update the RGs in this UT
+
+        // Basic check that latency metrics are doing some work.
+        Summary.Child.Value usageAggrLatency = rgs.getRgUsageAggregationLatency();
+        Assert.assertNotEquals(usageAggrLatency.count, 0);
+        Assert.assertNotEquals(usageAggrLatency.sum, 0);
+        double fiftiethPercentileValue = usageAggrLatency.quantiles.get(0.5);
+        Assert.assertNotEquals(fiftiethPercentileValue, 0);
+        double ninetethPercentileValue = usageAggrLatency.quantiles.get(0.9);
+        Assert.assertNotEquals(ninetethPercentileValue, 0);
+
+        Summary.Child.Value quotaCalcLatency = rgs.getRgQuotaCalculationTime();
+        Assert.assertNotEquals(quotaCalcLatency.count, 0);
+        Assert.assertNotEquals(quotaCalcLatency.sum, 0);
+        fiftiethPercentileValue = quotaCalcLatency.quantiles.get(0.5);
+        Assert.assertNotEquals(fiftiethPercentileValue, 0);
+        ninetethPercentileValue = quotaCalcLatency.quantiles.get(0.9);
+        Assert.assertNotEquals(ninetethPercentileValue, 0);
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(RGUsageMTAggrWaitForAllMesgs.class);
+
+    // Empirically, there appears to be a 45-byte overhead for metadata, imposed by Pulsar runtime.
+    private final int PER_MESSAGE_METADATA_OHEAD = 45;
+
+    private static final int PUBLISH_INTERVAL_SECS = 10;
+    private final int NumProducers = 32;
+    private final int NumConsumers = 64;
+    private final int NumMessagesPerProducer = 100;
+    private final int NumTopics = 32;  // Set == NumProducers, so each producer can send on its own topic
+    private final int NumRGs = 16; // arbitrarily, half of NumTopics, so 2 topics map to each RG
+    private final int NumTotalMessages = NumMessagesPerProducer * NumProducers;
+    private final int NumMessagesPerConsumer = NumTotalMessages / NumConsumers;
+    private final org.apache.pulsar.common.policies.data.ResourceGroup rgConfig =
+                                                            new org.apache.pulsar.common.policies.data.ResourceGroup();
+    private ResourceGroupService rgservice;
+    private ResourceGroup[] resGroups = new ResourceGroup[NumRGs];
+
+    private final String clusterName = "test";
+    private final String BaseRGName = "rg-";
+    private final String BaseTestTopicName = "rgusage-topic-";
+
+    private String[] RGNames = new String[NumRGs];
+
+    // The number of times we pretend to have not suppressed sending a local usage report.
+    private long numLocalUsageReports;
+
+    // Combinations of tenant and namespace required to test RG use cases when both NS and tenant are under the control
+    // of the same RG, vs. cases where they are under the control of distinct RGs.
+    // [This is required to test the special case of "tenant and NS refer to the same RG", because in that case
+    // we don't double-count the usage.]
+    // Same-order mapping: e.g., rg-0/rg-0 (for 0th entry)
+    private String[] TenantAndNsNameSameOrder = new String[NumRGs];
+
+    // Opposite order mapping: e.g., rg-0/rg-49 (for 0th entry with 50 RGs)
+    private String[] TenantAndNsNameOppositeOrder = new String[NumRGs];
+
+    // Similar to above (same and opposite order) for topics.
+    // E.g., rg-0/rg-0/rgusage-topic0 for 0-th topic in "same order"
+    // and rg-0/rg-49/rgusage-topic0 for 0-th topic in "opposite order", with 50 RGs
+    private String[] TopicNamesSameTenantAndNsRGs = new String[NumTopics];
+    private String[] TopicNamesDifferentTenantAndNsRGs = new String[NumTopics];
+
+    // Persistent and non-persistent topic strings with the above names.
+    private String[] PersistentTopicNamesSameTenanatAndNsRGs = new String[NumTopics];
+    private String[] PersistentTopicNamesDifferentTenantAndNsRGs = new String[NumTopics];
+    private String[] NonPersistentTopicNamesSameTenantAndNsRGs = new String[NumTopics];
+    private String[] NonPersistentTopicNamesDifferentTenantAndNsRGs = new String[NumTopics];
+
+    private List<String[]> AllTopicNames = Arrays.asList(
+            PersistentTopicNamesSameTenanatAndNsRGs,
+            PersistentTopicNamesDifferentTenantAndNsRGs,
+            NonPersistentTopicNamesSameTenantAndNsRGs,
+            NonPersistentTopicNamesDifferentTenantAndNsRGs);
+
+    // Keep track of the namespaces that were created, so we don't dup and get exceptions
+    HashSet<String> createdNamespaces = new HashSet<>();
+
+    // Keep track of the topics that were created, so we don't dup and get exceptions
+    HashSet<String> createdTopics = new HashSet<>();
+
+    // Keep track of the tenants that have been registered to their RGs, so we don't dup and get exceptions
+    HashSet<String> registeredTenants = new HashSet<>();
+
+    // Keep track of the namespaces that have been registered to their RGs, so we don't dup and get exceptions
+    HashSet<String> registeredNamespaces = new HashSet<>();
+
+    // Prometheus stats are monotonically increasing numbers.
+    // On each run, the resource-group metrics are incremented in Prometheus.
+    // So, we keep some residuals to help isolate/verify "this run's" values.
+    long residualTenantRegs;
+    long residualNamespaceRegs;
+    long residualSentNumBytes;
+    long residualSentNumMessages;
+    long residualRecvdNumBytes;
+    long residualRecvdNumMessages;
+
+    // Create the topics provided
+    private void createTopics(String[] topics) {
+        BrokerService bs = this.pulsar.getBrokerService();
+        for (String topic : topics) {
+            if (!createdTopics.contains(topic)) {
+                bs.getOrCreateTopic(topic);
+                createdTopics.add(topic);
+            }
+        }
+    }
+
+    // Destroy the topics provided
+    private void destroyTopics(String[] topics) {
+        BrokerService bs = this.pulsar.getBrokerService();
+        for (String topic : topics) {
+            if (!createdTopics.contains(topic)) {
+                bs.deleteTopic(topic, true);
+                createdTopics.remove(topic);
+            }
+        }
+    }
+
+    // Create all the RGs named in RGNames[]
+    private void createRGs() throws Exception {
+        for (String rgname : RGNames) {
+            this.rgservice.resourceGroupCreate(rgname, rgConfig);
+        }
+    }
+
+    // Destroy all the RGs named in RGNames[]
+    private void destroyRGs() throws Exception {
+        for (String rgname : RGNames) {
+            this.rgservice.resourceGroupDelete(rgname);
+        }
+    }
+
+    // Initial set up for transport manager and cluster creation.
+    private void prepareForOps() throws PulsarAdminException {
+        this.conf.setResourceUsageTransportPublishIntervalInSecs(PUBLISH_INTERVAL_SECS);
+        this.conf.setAllowAutoTopicCreation(true);
+        admin.clusters().createCluster(clusterName, ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
+    }
+
+    // Set up of RG/tenant/namespaces/topic names, and checking of the test parameters.
+    private void prepareRGs() throws Exception {
+        // Check for a few invariants which allow easier mapping of structures in the test.
+
+        // Ensure that the number of consumers is a multiple of the number of producers.
+        Assert.assertTrue(NumConsumers >= NumProducers && NumConsumers % NumProducers == 0);
+
+        // Number of messages is a multiple of the number of topics.
+        Assert.assertEquals(NumTotalMessages % NumTopics,  0);
+
+        // Ensure that the number of topics is a multiple of the number of RGs.
+        Assert.assertEquals(NumTopics % NumRGs,  0);
+
+        // Ensure that the messages-per-consumer is an integral multiple of the number of consumers.
+        final int NumConsumerMessages = NumMessagesPerConsumer * NumConsumers;
+        final int NumProducerMessages = NumMessagesPerProducer * NumProducers;
+        Assert.assertTrue(NumMessagesPerConsumer > 0 && NumConsumerMessages == NumProducerMessages);
+
+        rgConfig.setPublishRateInBytes(1500);
+        rgConfig.setPublishRateInMsgs(100);
+        rgConfig.setDispatchRateInBytes(4000);
+        rgConfig.setDispatchRateInMsgs(500);
+
+        // Set up the RG names; creation of RGs will be done elsewhere.
+        for (int ix = 0; ix < NumRGs; ix++) {
+            RGNames[ix] = BaseRGName + ix;
+        }
+
+        // Create all the tenants
+        final TenantInfo configInfo =
+                new TenantInfoImpl(Sets.newHashSet("fakeAdminRole"), Sets.newHashSet(clusterName));
+        for (int ix = 0; ix < NumRGs; ix++) {
+            admin.tenants().createTenant(RGNames[ix], configInfo);
+        }
+
+        // Set up the tenant-and-nsname mapping strings, for same and opposite order of RGs.
+        for (int ix = 0; ix < NumRGs; ix++) {
+            TenantAndNsNameSameOrder[ix] = RGNames[ix] + "/" + RGNames[ix];
+            TenantAndNsNameOppositeOrder[ix] = RGNames[ix] + "/" + RGNames[NumRGs - (ix+1)];
+        }
+
+        // Create all the namespaces
+        for (int ix = 0; ix < NumRGs; ix++) {
+            if (!createdNamespaces.contains(TenantAndNsNameSameOrder[ix])) {
+                admin.namespaces().createNamespace(TenantAndNsNameSameOrder[ix]);
+                admin.namespaces().setNamespaceReplicationClusters(
+                        TenantAndNsNameSameOrder[ix], Sets.newHashSet(clusterName));
+                createdNamespaces.add(TenantAndNsNameSameOrder[ix]);
+            }
+
+            if (!createdNamespaces.contains(TenantAndNsNameOppositeOrder[ix])) {
+                admin.namespaces().createNamespace(TenantAndNsNameOppositeOrder[ix]);
+                admin.namespaces().setNamespaceReplicationClusters(
+                        TenantAndNsNameOppositeOrder[ix], Sets.newHashSet(clusterName));
+                createdNamespaces.add(TenantAndNsNameOppositeOrder[ix]);
+            }
+        }
+
+        // Create all the topic name strings
+        for (int ix = 0; ix < NumTopics; ix++) {
+            TopicNamesSameTenantAndNsRGs[ix] =
+                    TenantAndNsNameSameOrder[ix % NumRGs] + "/" + BaseTestTopicName + ix;
+            TopicNamesDifferentTenantAndNsRGs[ix] =
+                    TenantAndNsNameOppositeOrder[ix % NumRGs] + "/" + BaseTestTopicName + ix;
+        }
+
+        // Create all the persistent and non-persistent topic strings
+        for (int ix = 0; ix < NumTopics; ix++) {
+            PersistentTopicNamesSameTenanatAndNsRGs[ix] =
+                    "persistent://" + TopicNamesSameTenantAndNsRGs[ix];
+            PersistentTopicNamesDifferentTenantAndNsRGs[ix] =
+                    "persistent://" + TopicNamesDifferentTenantAndNsRGs[ix];
+            NonPersistentTopicNamesSameTenantAndNsRGs[ix] =
+                    "non-persistent://" + TopicNamesSameTenantAndNsRGs[ix];
+            NonPersistentTopicNamesDifferentTenantAndNsRGs[ix] =
+                    "non-persistent://" + TopicNamesDifferentTenantAndNsRGs[ix];
+        }
+    }
+}