You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by dd...@apache.org on 2021/07/29 09:37:15 UTC

[zookeeper] branch master updated: ZOOKEEPER-4211: Expose Per Namespace Quota Metrics to Prometheus

This is an automated email from the ASF dual-hosted git repository.

ddiederen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new b05ea1a  ZOOKEEPER-4211: Expose Per Namespace Quota Metrics to Prometheus
b05ea1a is described below

commit b05ea1a8454d806394e1ecdfca550e8c709d4c21
Author: liwang <li...@apple.com>
AuthorDate: Thu Jul 29 09:35:47 2021 +0000

    ZOOKEEPER-4211: Expose Per Namespace Quota Metrics to Prometheus
    
    Motivation
    
    In 3.7, Quota limit can be enforced and the quota related stats are captured.  From the "listquota" CLI command, we can the quota limit and usage info. This is an addition to that so we can collect the quota metrics per top namespace and expose them to the Prometheus for monitor and alert purpose.
    
    Summary of Changes
    
    - added 5 quota metrics
    - added GaugeSet metric type to group gauge metrics by key
    - changed PrometheusMetricsProvider to to support the GaugeSet
    - changed ZookeeperServer and DataTree to collect/publish quota metrics
    
    Author: liwang <li...@apple.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Damien Diederen <dd...@apache.org>
    
    Closes #1644 from li4wang/ZOOKEEPER-4211
---
 .../prometheus/PrometheusMetricsProvider.java      |  94 ++++-
 .../prometheus/PrometheusMetricsProviderTest.java  | 395 +++++++++++++++++++--
 .../main/java/org/apache/zookeeper/StatsTrack.java |  18 +
 .../org/apache/zookeeper/common/PathUtils.java     |  14 +
 .../org/apache/zookeeper/metrics/CounterSet.java   |  47 +++
 .../org/apache/zookeeper/metrics/GaugeSet.java     |  37 ++
 .../apache/zookeeper/metrics/MetricsContext.java   |  30 ++
 .../metrics/impl/DefaultMetricsProvider.java       |  40 +++
 .../metrics/impl/NullMetricsProvider.java          |  24 ++
 .../java/org/apache/zookeeper/server/DataTree.java |  10 +-
 .../org/apache/zookeeper/server/ServerMetrics.java |   6 +
 .../apache/zookeeper/server/ZooKeeperServer.java   |  34 +-
 .../zookeeper/server/metric/SimpleCounterSet.java  |  55 +++
 .../zookeeper/server/util/QuotaMetricsUtils.java   | 167 +++++++++
 .../org/apache/zookeeper/common/PathUtilsTest.java |   9 +
 .../zookeeper/server/ZooKeeperServerTest.java      |  26 ++
 .../server/metric/SimpleCounterSetTest.java        |  61 ++++
 .../server/util/QuotaMetricsUtilsTest.java         | 326 +++++++++++++++++
 .../apache/zookeeper/test/EnforceQuotaTest.java    |   9 +-
 .../apache/zookeeper/test/ZooKeeperQuotaTest.java  |  88 +++--
 20 files changed, 1417 insertions(+), 73 deletions(-)

diff --git a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
index bcafdd9..e17b097 100644
--- a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
+++ b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/main/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProvider.java
@@ -43,7 +43,9 @@ import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.metrics.Counter;
+import org.apache.zookeeper.metrics.CounterSet;
 import org.apache.zookeeper.metrics.Gauge;
+import org.apache.zookeeper.metrics.GaugeSet;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.metrics.MetricsProvider;
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
@@ -223,6 +225,9 @@ public class PrometheusMetricsProvider implements MetricsProvider {
     private void sampleGauges() {
         rootContext.gauges.values()
                 .forEach(PrometheusGaugeWrapper::sample);
+
+        rootContext.gaugeSets.values()
+                .forEach(PrometheusLabelledGaugeWrapper::sample);
     }
 
     @Override
@@ -233,7 +238,9 @@ public class PrometheusMetricsProvider implements MetricsProvider {
     private class Context implements MetricsContext {
 
         private final ConcurrentMap<String, PrometheusGaugeWrapper> gauges = new ConcurrentHashMap<>();
+        private final ConcurrentMap<String, PrometheusLabelledGaugeWrapper> gaugeSets = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, PrometheusCounter> counters = new ConcurrentHashMap<>();
+        private final ConcurrentMap<String, PrometheusLabelledCounter> counterSets = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, PrometheusSummary> basicSummaries = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, PrometheusSummary> summaries = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, PrometheusLabelledSummary> basicSummarySets = new ConcurrentHashMap<>();
@@ -250,6 +257,12 @@ public class PrometheusMetricsProvider implements MetricsProvider {
             return counters.computeIfAbsent(name, PrometheusCounter::new);
         }
 
+        @Override
+        public CounterSet getCounterSet(final String name) {
+            Objects.requireNonNull(name, "Cannot register a CounterSet with null name");
+            return counterSets.computeIfAbsent(name, PrometheusLabelledCounter::new);
+        }
+
         /**
          * Gauges may go up and down, in ZooKeeper they are a way to export
          * internal values with a callback.
@@ -273,6 +286,25 @@ public class PrometheusMetricsProvider implements MetricsProvider {
         }
 
         @Override
+        public void registerGaugeSet(final String name, final GaugeSet gaugeSet) {
+            Objects.requireNonNull(name, "Cannot register a GaugeSet with null name");
+            Objects.requireNonNull(gaugeSet, "Cannot register a null GaugeSet for " + name);
+
+            gaugeSets.compute(name, (id, prev) ->
+                new PrometheusLabelledGaugeWrapper(name, gaugeSet, prev != null ? prev.inner : null));
+        }
+
+        @Override
+        public void unregisterGaugeSet(final String name) {
+            Objects.requireNonNull(name, "Cannot unregister GaugeSet with null name");
+
+            final PrometheusLabelledGaugeWrapper existing = gaugeSets.remove(name);
+            if (existing != null) {
+                existing.unregister();
+            }
+        }
+
+        @Override
         public Summary getSummary(String name, DetailLevel detailLevel) {
             if (detailLevel == DetailLevel.BASIC) {
                 return basicSummaries.computeIfAbsent(name, (n) -> {
@@ -344,6 +376,28 @@ public class PrometheusMetricsProvider implements MetricsProvider {
 
     }
 
+    private class PrometheusLabelledCounter implements CounterSet {
+        private final String name;
+        private final io.prometheus.client.Counter inner;
+
+        public PrometheusLabelledCounter(final String name) {
+            this.name = name;
+            this.inner = io.prometheus.client.Counter
+                    .build(name, name)
+                    .labelNames(LABELS)
+                    .register(collectorRegistry);
+        }
+
+        @Override
+        public void add(final String key, final long delta) {
+            try {
+                inner.labels(key).inc(delta);
+            } catch (final IllegalArgumentException e) {
+                LOG.error("invalid delta {} for metric {} with key {}", delta, name, key, e);
+            }
+        }
+    }
+
     private class PrometheusGaugeWrapper {
 
         private final io.prometheus.client.Gauge inner;
@@ -371,10 +425,42 @@ public class PrometheusMetricsProvider implements MetricsProvider {
         private void unregister() {
             collectorRegistry.unregister(inner);
         }
+    }
+
+    /**
+     * Prometheus implementation of GaugeSet interface. It wraps the GaugeSet object and
+     * uses the callback API to update the Prometheus Gauge.
+     */
+    private class PrometheusLabelledGaugeWrapper {
+        private final GaugeSet gaugeSet;
+        private final io.prometheus.client.Gauge inner;
+
+        private PrometheusLabelledGaugeWrapper(final String name,
+                                               final GaugeSet gaugeSet,
+                                               final io.prometheus.client.Gauge prev) {
+            this.gaugeSet = gaugeSet;
+            this.inner = prev != null ? prev :
+                    io.prometheus.client.Gauge
+                            .build(name, name)
+                            .labelNames(LABELS)
+                            .register(collectorRegistry);
+        }
 
+        /**
+         * Call the callback provided by the GaugeSet and update Prometheus Gauge.
+         * This method is called when the server is polling for a value.
+         */
+        private void sample() {
+            gaugeSet.values().forEach((key, value) ->
+                this.inner.labels(key).set(value != null ? value.doubleValue() : 0));
+        }
+
+        private void unregister() {
+            collectorRegistry.unregister(inner);
+        }
     }
 
-    class PrometheusSummary implements Summary {
+    private class PrometheusSummary implements Summary {
 
         private final io.prometheus.client.Summary inner;
         private final String name;
@@ -401,7 +487,7 @@ public class PrometheusMetricsProvider implements MetricsProvider {
             reportMetrics(() -> observe(delta));
         }
 
-        void observe(final long delta) {
+        private void observe(final long delta) {
             try {
                 inner.observe(delta);
             } catch (final IllegalArgumentException err) {
@@ -410,7 +496,7 @@ public class PrometheusMetricsProvider implements MetricsProvider {
         }
     }
 
-    class PrometheusLabelledSummary implements SummarySet {
+    private class PrometheusLabelledSummary implements SummarySet {
 
         private final io.prometheus.client.Summary inner;
         private final String name;
@@ -439,7 +525,7 @@ public class PrometheusMetricsProvider implements MetricsProvider {
             reportMetrics(() -> observe(key, value));
         }
 
-        void observe(final String key, final long value) {
+        private void observe(final String key, final long value) {
             try {
                 inner.labels(key).observe(value);
             } catch (final IllegalArgumentException err) {
diff --git a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
index 94f1592..9284f52 100644
--- a/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
+++ b/zookeeper-metrics-providers/zookeeper-prometheus-metrics/src/test/java/org/apache/zookeeper/metrics/prometheus/PrometheusMetricsProviderTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.Mockito.mock;
@@ -30,16 +31,25 @@ import io.prometheus.client.CollectorRegistry;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.metrics.Counter;
+import org.apache.zookeeper.metrics.CounterSet;
 import org.apache.zookeeper.metrics.Gauge;
+import org.apache.zookeeper.metrics.GaugeSet;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.metrics.Summary;
+import org.apache.zookeeper.metrics.SummarySet;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
 import org.hamcrest.CoreMatchers;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -58,6 +68,7 @@ public class PrometheusMetricsProviderTest {
         CollectorRegistry.defaultRegistry.clear();
         provider = new PrometheusMetricsProvider();
         Properties configuration = new Properties();
+        configuration.setProperty("numWorkerThreads", "0"); // sync behavior for test
         configuration.setProperty("httpHost", "127.0.0.1"); // local host for test
         configuration.setProperty("httpPort", "0"); // ephemeral port
         configuration.setProperty("exportJvmInfo", "false");
@@ -107,6 +118,106 @@ public class PrometheusMetricsProviderTest {
     }
 
     @Test
+    public void testCounterSet_single() throws Exception {
+        // create and register a CounterSet
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+        final CounterSet counterSet = provider.getRootContext().getCounterSet(name);
+        final String[] keys = {"ns1", "ns2"};
+        final int count = 3;
+
+        // update the CounterSet multiple times
+        for (int i = 0; i < count; i++) {
+            Arrays.asList(keys).forEach(key -> counterSet.inc(key));
+            Arrays.asList(keys).forEach(key -> counterSet.add(key, 2));
+        }
+
+        // validate with dump call
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        for (final String key : keys) {
+            expectedMetricsMap.put(String.format("%s{key=\"%s\"}", name, key), count * 3.0);
+        }
+        validateWithDump(expectedMetricsMap);
+
+        // validate with servlet call
+        final List<String> expectedNames = Collections.singletonList(String.format("# TYPE %s count", name));
+        final List<String> expectedMetrics = new ArrayList<>();
+        for (final String key : keys) {
+            expectedMetrics.add(String.format("%s{key=\"%s\",} %s", name, key, count * 3.0));
+        }
+        validateWithServletCall(expectedNames, expectedMetrics);
+
+        // validate registering with same name, no overwriting
+        assertSame(counterSet, provider.getRootContext().getCounterSet(name));
+    }
+
+    @Test
+    public void testCounterSet_multiple() throws Exception {
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+
+        final String[] names = new String[]{name + "_1", name + "_2"};
+        final String[] keys = new String[]{"ns21", "ns22"};
+        final int[] counts = new int[] {3, 5};
+
+        final int length = names.length;
+        final CounterSet[] counterSets = new CounterSet[length];
+
+        // create and register the CounterSets
+        for (int i = 0; i < length; i++) {
+            counterSets[i] = provider.getRootContext().getCounterSet(names[i]);
+        }
+
+        // update each CounterSet multiple times
+        for (int i = 0; i < length; i++) {
+            for (int j = 0; j < counts[i]; j++) {
+                counterSets[i].inc(keys[i]);
+            }
+        }
+
+        // validate with dump call
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        for (int i = 0; i < length; i++) {
+            expectedMetricsMap.put(String.format("%s{key=\"%s\"}", names[i], keys[i]), counts[i] * 1.0);
+        }
+        validateWithDump(expectedMetricsMap);
+
+        // validate with servlet call
+        final List<String> expectedNames = new ArrayList<>();
+        final List<String> expectedMetrics = new ArrayList<>();
+        for (int i = 0; i < length; i++) {
+            expectedNames.add(String.format("# TYPE %s count", names[i]));
+            expectedMetrics.add(String.format("%s{key=\"%s\",} %s", names[i], keys[i], counts[i]  * 1.0));
+        }
+        validateWithServletCall(expectedNames, expectedMetrics);
+    }
+
+    @Test
+    public void testCounterSet_registerWithNullName() {
+        assertThrows(NullPointerException.class,
+                () -> provider.getRootContext().getCounterSet(null));
+    }
+
+    @Test
+    public void testCounterSet_negativeValue() {
+        // create and register a CounterSet
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+        final CounterSet counterSet = provider.getRootContext().getCounterSet(name);
+
+        // add negative value and make sure no exception is thrown
+        counterSet.add("ns1", -1);
+    }
+
+    @Test
+    public void testCounterSet_nullKey() {
+        // create and register a CounterSet
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+        final CounterSet counterSet = provider.getRootContext().getCounterSet(name);
+
+        // increment the count with null key and make sure no exception is thrown
+        counterSet.inc(null);
+        counterSet.add(null, 2);
+    }
+
+    @Test
     public void testGauge() throws Exception {
         int[] values = {78, -89};
         int[] callCounts = {0, 0};
@@ -180,11 +291,10 @@ public class PrometheusMetricsProviderTest {
 
     @Test
     public void testBasicSummary() throws Exception {
-        final PrometheusMetricsProvider.PrometheusSummary summary =
-                (PrometheusMetricsProvider.PrometheusSummary) provider.getRootContext()
+        Summary summary = provider.getRootContext()
                 .getSummary("cc", MetricsContext.DetailLevel.BASIC);
-        summary.observe(10);
-        summary.observe(10);
+        summary.add(10);
+        summary.add(10);
         int[] count = {0};
         provider.dump((k, v) -> {
             count[0]++;
@@ -230,11 +340,10 @@ public class PrometheusMetricsProviderTest {
 
     @Test
     public void testAdvancedSummary() throws Exception {
-        final PrometheusMetricsProvider.PrometheusSummary summary =
-                (PrometheusMetricsProvider.PrometheusSummary) provider.getRootContext()
+        Summary summary = provider.getRootContext()
                 .getSummary("cc", MetricsContext.DetailLevel.ADVANCED);
-        summary.observe(10);
-        summary.observe(10);
+        summary.add(10);
+        summary.add(10);
         int[] count = {0};
         provider.dump((k, v) -> {
             count[0]++;
@@ -287,34 +396,6 @@ public class PrometheusMetricsProviderTest {
     }
 
     @Test
-    public void testSummary_sync() throws Exception {
-        final Properties config = new Properties();
-        config.setProperty("numWorkerThreads", "0");
-        config.setProperty("httpPort", "0"); // ephemeral port
-        config.setProperty("exportJvmInfo", "false");
-
-        PrometheusMetricsProvider metricsProvider = null;
-        try {
-            metricsProvider = new PrometheusMetricsProvider();
-            metricsProvider.configure(config);
-            metricsProvider.start();
-
-            final Summary summary =
-                    metricsProvider.getRootContext().getSummary("cc", MetricsContext.DetailLevel.BASIC);
-            summary.add(10);
-            summary.add(20);
-
-            final Map<String, Object> res = new HashMap<>();
-            metricsProvider.dump(res::put);
-            assertEquals(3, res.keySet().stream().filter(key -> key.startsWith("cc")).count());
-        } finally {
-            if (metricsProvider != null) {
-                metricsProvider.stop();
-            }
-        }
-    }
-
-    @Test
     public void testSummary_asyncAndExceedMaxQueueSize() throws Exception {
         final Properties config = new Properties();
         config.setProperty("numWorkerThreads", "1");
@@ -341,6 +422,54 @@ public class PrometheusMetricsProviderTest {
         }
     }
 
+    @Test
+    public void testSummarySet() throws Exception {
+        final String name = "ss";
+        final String[] keys = {"ns1", "ns2"};
+        final double count = 3.0;
+
+        // create and register a SummarySet
+        final SummarySet summarySet = provider.getRootContext()
+                .getSummarySet(name, MetricsContext.DetailLevel.BASIC);
+
+        // update the SummarySet multiple times
+        for (int i = 0; i < count; i++) {
+            Arrays.asList(keys).forEach(key -> summarySet.add(key, 1));
+        }
+
+        // validate with dump call
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        for (final String key : keys) {
+            expectedMetricsMap.put(String.format("%s{key=\"%s\",quantile=\"0.5\"}", name, key), 1.0);
+            expectedMetricsMap.put(String.format("%s_count{key=\"%s\"}", name, key), count);
+            expectedMetricsMap.put(String.format("%s_sum{key=\"%s\"}", name, key), count);
+        }
+        validateWithDump(expectedMetricsMap);
+
+        // validate with servlet call
+        final List<String> expectedNames = Collections.singletonList(String.format("# TYPE %s summary", name));
+        final List<String> expectedMetrics = new ArrayList<>();
+        for (final String key : keys) {
+            expectedMetrics.add(String.format("%s{key=\"%s\",quantile=\"0.5\",} %s", name, key, 1.0));
+            expectedMetrics.add(String.format("%s_count{key=\"%s\",} %s", name, key, count));
+            expectedMetrics.add(String.format("%s_sum{key=\"%s\",} %s", name, key, count));
+        }
+        validateWithServletCall(expectedNames, expectedMetrics);
+
+        // validate registering with same name, no overwriting
+        assertSame(summarySet, provider.getRootContext()
+                .getSummarySet(name, MetricsContext.DetailLevel.BASIC));
+
+        // validate registering with different DetailLevel, not allowed
+        try {
+            provider.getRootContext()
+                    .getSummarySet(name, MetricsContext.DetailLevel.ADVANCED);
+            fail("Can't get the same summarySet with a different DetailLevel");
+        } catch (final IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("Already registered"));
+        }
+    }
+
     private String callServlet() throws ServletException, IOException {
         // we are not performing an HTTP request
         // but we are calling directly the servlet
@@ -353,4 +482,198 @@ public class PrometheusMetricsProviderTest {
         return res;
     }
 
+    @Test
+    public void testGaugeSet_singleGaugeSet() throws Exception {
+        final String name = QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE;
+        final Number[] values = {10.0, 100.0};
+        final String[] keys = {"ns11", "ns12"};
+        final Map<String, Number> metricsMap = new HashMap<>();
+        for (int i = 0; i < values.length; i++) {
+            metricsMap.put(keys[i], values[i]);
+        }
+        final AtomicInteger callCount = new AtomicInteger(0);
+
+        // create and register GaugeSet
+        createAndRegisterGaugeSet(name, metricsMap, callCount);
+
+        // validate with dump call
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        for (int i = 0; i < values.length; i++) {
+            expectedMetricsMap.put(String.format("%s{key=\"%s\"}", name, keys[i]), values[i]);
+        }
+        validateWithDump(expectedMetricsMap);
+        assertEquals(1, callCount.get());
+
+        // validate with servlet call
+        final List<String> expectedNames = Collections.singletonList(String.format("# TYPE %s gauge", name));
+        final List<String> expectedMetrics = new ArrayList<>();
+        for (int i = 0; i < values.length; i++) {
+            expectedMetrics.add(String.format("%s{key=\"%s\",} %s", name, keys[i], values[i]));
+        }
+        validateWithServletCall(expectedNames, expectedMetrics);
+        assertEquals(2, callCount.get());
+
+        // unregister the GaugeSet
+        callCount.set(0);
+        provider.getRootContext().unregisterGaugeSet(name);
+
+        // validate with dump call
+        validateWithDump(Collections.emptyMap());
+        assertEquals(0, callCount.get());
+
+        // validate with servlet call
+        validateWithServletCall(new ArrayList<>(), new ArrayList<>());
+        assertEquals(0, callCount.get());
+    }
+
+    @Test
+    public void testGaugeSet_multipleGaugeSets() throws Exception {
+        final String[] names = new String[] {
+                QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
+                QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE
+        };
+
+        final Number[] values = new Number[] {20.0, 200.0};
+        final String[] keys = new String[]{"ns21", "ns22"};
+        final int count = names.length;
+        final AtomicInteger[] callCounts = new AtomicInteger[count];
+
+        // create and register the GaugeSets
+        for (int i = 0; i < count; i++) {
+            final Map<String, Number> metricsMap = new HashMap<>();
+            metricsMap.put(keys[i], values[i]);
+            callCounts[i] = new AtomicInteger(0);
+            createAndRegisterGaugeSet(names[i], metricsMap, callCounts[i]);
+        }
+
+        // validate with dump call
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        for (int i = 0; i < count; i++) {
+            expectedMetricsMap.put(String.format("%s{key=\"%s\"}", names[i], keys[i]), values[i]);
+        }
+        validateWithDump(expectedMetricsMap);
+        for (int i = 0; i < count; i++) {
+            assertEquals(1, callCounts[i].get());
+        }
+
+        // validate with servlet call
+        final List<String> expectedNames = new ArrayList<>();
+        final List<String> expectedMetrics = new ArrayList<>();
+        for (int i = 0; i < count; i++) {
+            expectedNames.add(String.format("# TYPE %s gauge", names[i]));
+            expectedMetrics.add(String.format("%s{key=\"%s\",} %s", names[i], keys[i], values[i]));
+        }
+        validateWithServletCall(expectedNames, expectedMetrics);
+        for (int i = 0; i < count; i++) {
+            assertEquals(2, callCounts[i].get());
+        }
+
+        // unregister the GaugeSets
+        for (int i = 0; i < count; i++) {
+            callCounts[i].set(0);
+            provider.getRootContext().unregisterGaugeSet(names[i]);
+        }
+
+        // validate with dump call
+        validateWithDump(Collections.emptyMap());
+        for (int i = 0; i < count; i++) {
+            assertEquals(0, callCounts[i].get());
+        }
+
+        // validate with servlet call
+        validateWithServletCall(new ArrayList<>(), new ArrayList<>());
+        for (int i = 0; i < count; i++) {
+            assertEquals(0, callCounts[i].get());
+        }
+    }
+
+    @Test
+    public void testGaugeSet_overwriteRegister() {
+        final String[] names = new String[] {
+                QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
+                QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE
+        };
+
+        final int count = names.length;
+        final Number[] values = new Number[]{30.0, 300.0};
+        final String[] keys = new String[] {"ns31", "ns32"};
+        final AtomicInteger[] callCounts = new AtomicInteger[count];
+
+        // create and register the GaugeSets
+        for (int i = 0; i < count; i++) {
+            final Map<String, Number> metricsMap = new HashMap<>();
+            metricsMap.put(keys[i], values[i]);
+            callCounts[i] = new AtomicInteger(0);
+            // use the same name so the first GaugeSet got overwrite
+            createAndRegisterGaugeSet(names[0], metricsMap, callCounts[i]);
+        }
+
+        // validate with dump call to make sure the second GaugeSet overwrites the first
+        final Map<String, Number> expectedMetricsMap = new HashMap<>();
+        expectedMetricsMap.put(String.format("%s{key=\"%s\"}", names[0], keys[1]), values[1]);
+        validateWithDump(expectedMetricsMap);
+        assertEquals(0, callCounts[0].get());
+        assertEquals(1, callCounts[1].get());
+    }
+
+    @Test
+    public void testGaugeSet_nullKey() {
+        final String name = QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE;
+        final Map<String, Number> metricsMap = new HashMap<>();
+        metricsMap.put(null, 10.0);
+
+        final AtomicInteger callCount = new AtomicInteger(0);
+
+        // create and register GaugeSet
+        createAndRegisterGaugeSet(name, metricsMap, callCount);
+
+        // validate with dump call
+        assertThrows(IllegalArgumentException.class, () -> provider.dump(new HashMap<>()::put));
+
+        // validate with servlet call
+        assertThrows(IllegalArgumentException.class, this::callServlet);
+    }
+
+    @Test
+    public void testGaugeSet_registerWithNullGaugeSet() {
+        assertThrows(NullPointerException.class,
+                () -> provider.getRootContext().registerGaugeSet("name", null));
+
+        assertThrows(NullPointerException.class,
+                () -> provider.getRootContext().registerGaugeSet(null, HashMap::new));
+    }
+
+    @Test
+    public void testGaugeSet_unregisterNull() {
+        assertThrows(NullPointerException.class,
+                () -> provider.getRootContext().unregisterGaugeSet(null));
+    }
+
+    private void createAndRegisterGaugeSet(final String name,
+                                           final Map<String, Number> metricsMap,
+                                           final AtomicInteger callCount) {
+        final GaugeSet gaugeSet = () -> {
+            callCount.addAndGet(1);
+            return metricsMap;
+        };
+        provider.getRootContext().registerGaugeSet(name, gaugeSet);
+    }
+
+    private void validateWithDump(final Map<String, Number> expectedMetrics) {
+        final Map<String, Object> returnedMetrics = new HashMap<>();
+        provider.dump(returnedMetrics::put);
+        assertEquals(expectedMetrics.size(), returnedMetrics.size());
+        expectedMetrics.forEach((key, value) -> assertEquals(value, returnedMetrics.get(key)));
+    }
+
+    private void validateWithServletCall(final List<String> expectedNames,
+                                         final List<String> expectedMetrics) throws Exception {
+        final String response = callServlet();
+        if (expectedNames.isEmpty() && expectedMetrics.isEmpty()) {
+            assertTrue(response.isEmpty());
+        } else {
+            expectedNames.forEach(name -> assertThat(response, CoreMatchers.containsString(name)));
+            expectedMetrics.forEach(metric -> assertThat(response, CoreMatchers.containsString(metric)));
+        }
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java b/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
index aaea1d9..02fffb7 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/StatsTrack.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.regex.Pattern;
 import org.apache.zookeeper.common.StringUtils;
 
@@ -215,4 +216,21 @@ public class StatsTrack {
     public byte[] getStatsBytes() {
         return toString().getBytes(StandardCharsets.UTF_8);
     }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final StatsTrack that = (StatsTrack) o;
+        return Objects.equals(stats, that.stats);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(stats);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java
index 114df6e..7ddc557 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/PathUtils.java
@@ -109,4 +109,18 @@
          return path;
      }
 
+     /**
+      * return the top namespace of a znode path
+      *
+      * @param path znode path string
+      *
+      * @return the top namespace. If not exist, return null
+      */
+     public static String getTopNamespace(final String path) {
+         if (path == null) {
+             return null;
+         }
+         final String[] parts = path.split("/");
+         return parts.length > 1 ? parts[1] : null;
+     }
  }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/CounterSet.java b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/CounterSet.java
new file mode 100644
index 0000000..c9c7c13
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/CounterSet.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.metrics;
+
+/**
+ * A counter refers to a value which can only increase.
+ * Usually the value is reset when the process starts.
+ *
+ *  A CounterSet is a set of {@link Counter} grouped by keys.
+ */
+
+public interface CounterSet {
+    /**
+     * Increment the value by one for the given key
+     * <p>This method is thread safe, The MetricsProvider will take care of synchronization.</p>
+     *
+     * @param key the key to increment the count
+     */
+    default void inc(String key) {
+        add(key, 1L);
+    }
+
+    /**
+     * Increment the value by a given amount for the given key
+     * <p>This method is thread safe, The MetricsProvider will take care of synchronization.</p>
+     *
+     * @param key the key to increment the count for the given key
+     * @param delta amount to increment, this cannot be a negative number.
+     */
+    void add(String key, long delta);
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/GaugeSet.java b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/GaugeSet.java
new file mode 100644
index 0000000..15fd93f
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/GaugeSet.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.metrics;
+
+import java.util.Map;
+
+/**
+ * A Gauge is an application provided object which will be called by the metrics framework to sample a numeric value.
+ *
+ * A GaugeSet is a set of {@link Gauge} grouped by keys.
+ */
+public interface GaugeSet {
+    /**
+     * Returns all values and the associated keys of the GaugeSet.
+     * The MetricsProvider will call this callback without taking care of synchronization, it is up to the application
+     * to handle thread safety.
+     *
+     * @return all the values and keys
+     */
+    Map<String, Number> values();
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/MetricsContext.java b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/MetricsContext.java
index 3ea8f8c..18ff118 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/MetricsContext.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/MetricsContext.java
@@ -51,6 +51,15 @@ public interface MetricsContext {
     Counter getCounter(String name);
 
     /**
+     * Returns the CounterSet identified by the given name
+     * Null name is not allowed
+     *
+     * @param name
+     * @return CounterSet identified by the name in this context.
+     */
+    CounterSet getCounterSet(String name);
+
+    /**
      * Registers an user provided {@link Gauge} which will be called by the
      * MetricsProvider in order to sample an integer value.
      * If another Gauge was already registered the new one will
@@ -71,6 +80,27 @@ public interface MetricsContext {
      */
     void unregisterGauge(String name);
 
+    /**
+     * Registers a user provided {@link GaugeSet} which will be called by the
+     * MetricsProvider in order to sample number values.
+     * If another GaugeSet was already registered, the new one will take its place.
+     * Registering with a null name or null callback is not allowed.
+     *
+     * @param name unique name of the GaugeSet in this context
+     * @param gaugeSet the implementation of the GaugeSet
+     *
+     */
+    void registerGaugeSet(String name, GaugeSet gaugeSet);
+
+    /**
+     * Unregisters the user provided {@link GaugeSet} bound to the given name.
+     *
+     * Unregistering with a null name is not allowed.
+     * @param name unique name of the GaugeSet in this context
+     *
+     */
+    void unregisterGaugeSet(String name);
+
     enum DetailLevel {
         /**
          * The returned Summary is expected to track only simple aggregated
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/DefaultMetricsProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/DefaultMetricsProvider.java
index 85f86db..634ec25 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/DefaultMetricsProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/DefaultMetricsProvider.java
@@ -24,7 +24,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.function.BiConsumer;
 import org.apache.zookeeper.metrics.Counter;
+import org.apache.zookeeper.metrics.CounterSet;
 import org.apache.zookeeper.metrics.Gauge;
+import org.apache.zookeeper.metrics.GaugeSet;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.metrics.MetricsProvider;
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
@@ -35,6 +37,7 @@ import org.apache.zookeeper.server.metric.AvgMinMaxCounterSet;
 import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounter;
 import org.apache.zookeeper.server.metric.AvgMinMaxPercentileCounterSet;
 import org.apache.zookeeper.server.metric.SimpleCounter;
+import org.apache.zookeeper.server.metric.SimpleCounterSet;
 
 /**
  * Default implementation of {@link MetricsProvider}.<br>
@@ -64,6 +67,7 @@ public class DefaultMetricsProvider implements MetricsProvider {
     public void stop() {
         // release all references to external objects
         rootMetricsContext.gauges.clear();
+        rootMetricsContext.gaugeSets.clear();
     }
 
     @Override
@@ -79,7 +83,9 @@ public class DefaultMetricsProvider implements MetricsProvider {
     private static final class DefaultMetricsContext implements MetricsContext {
 
         private final ConcurrentMap<String, Gauge> gauges = new ConcurrentHashMap<>();
+        private final ConcurrentMap<String, GaugeSet> gaugeSets = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, SimpleCounter> counters = new ConcurrentHashMap<>();
+        private final ConcurrentMap<String, SimpleCounterSet> counterSets = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, AvgMinMaxCounter> basicSummaries = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, AvgMinMaxPercentileCounter> summaries = new ConcurrentHashMap<>();
         private final ConcurrentMap<String, AvgMinMaxCounterSet> basicSummarySets = new ConcurrentHashMap<>();
@@ -99,6 +105,12 @@ public class DefaultMetricsProvider implements MetricsProvider {
         }
 
         @Override
+        public CounterSet getCounterSet(final String name) {
+            Objects.requireNonNull(name, "Cannot register a CounterSet with null name");
+            return counterSets.computeIfAbsent(name, SimpleCounterSet::new);
+        }
+
+        @Override
         public void registerGauge(String name, Gauge gauge) {
             Objects.requireNonNull(gauge, "Cannot register a null Gauge for " + name);
             gauges.put(name, gauge);
@@ -110,6 +122,19 @@ public class DefaultMetricsProvider implements MetricsProvider {
         }
 
         @Override
+        public void registerGaugeSet(final String name, final GaugeSet gaugeSet) {
+            Objects.requireNonNull(name, "Cannot register a GaugeSet with null name");
+            Objects.requireNonNull(gaugeSet, "Cannot register a null GaugeSet for " + name);
+            gaugeSets.put(name, gaugeSet);
+        }
+
+        @Override
+        public void unregisterGaugeSet(final String name) {
+            Objects.requireNonNull(name, "Cannot unregister GaugeSet with null name");
+            gaugeSets.remove(name);
+        }
+
+        @Override
         public Summary getSummary(String name, DetailLevel detailLevel) {
             if (detailLevel == DetailLevel.BASIC) {
                 return basicSummaries.computeIfAbsent(name, (n) -> {
@@ -154,9 +179,21 @@ public class DefaultMetricsProvider implements MetricsProvider {
                     sink.accept(name, value);
                 }
             });
+
+            gaugeSets.forEach((name, gaugeSet) ->
+                gaugeSet.values().forEach((key, value) -> {
+                    if (key != null) {
+                        sink.accept(key + "_" + name, value != null ? value : 0);
+                    }
+                })
+            );
+
             counters.values().forEach(metric -> {
                 metric.values().forEach(sink);
             });
+            counterSets.values().forEach(metric -> {
+                metric.values().forEach(sink);
+            });
             basicSummaries.values().forEach(metric -> {
                 metric.values().forEach(sink);
             });
@@ -175,6 +212,9 @@ public class DefaultMetricsProvider implements MetricsProvider {
             counters.values().forEach(metric -> {
                 metric.reset();
             });
+            counterSets.values().forEach(metric -> {
+                metric.reset();
+            });
             basicSummaries.values().forEach(metric -> {
                 metric.reset();
             });
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/NullMetricsProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/NullMetricsProvider.java
index 30a1079..8d07e91 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/NullMetricsProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/metrics/impl/NullMetricsProvider.java
@@ -21,7 +21,9 @@ package org.apache.zookeeper.metrics.impl;
 import java.util.Properties;
 import java.util.function.BiConsumer;
 import org.apache.zookeeper.metrics.Counter;
+import org.apache.zookeeper.metrics.CounterSet;
 import org.apache.zookeeper.metrics.Gauge;
+import org.apache.zookeeper.metrics.GaugeSet;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.metrics.MetricsProvider;
 import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
@@ -78,6 +80,11 @@ public class NullMetricsProvider implements MetricsProvider {
         }
 
         @Override
+        public CounterSet getCounterSet(final String name) {
+            return NullCounterSet.INSTANCE;
+        }
+
+        @Override
         public void registerGauge(String name, Gauge gauge) {
         }
 
@@ -86,6 +93,14 @@ public class NullMetricsProvider implements MetricsProvider {
         }
 
         @Override
+        public void registerGaugeSet(final String name, final GaugeSet gaugeSet) {
+        }
+
+        @Override
+        public void unregisterGaugeSet(final String name) {
+        }
+
+        @Override
         public Summary getSummary(String name, DetailLevel detailLevel) {
             return NullSummary.INSTANCE;
         }
@@ -112,6 +127,15 @@ public class NullMetricsProvider implements MetricsProvider {
 
     }
 
+    private static final class NullCounterSet implements CounterSet {
+
+        private static final NullCounterSet INSTANCE = new NullCounterSet();
+
+        @Override
+        public void add(final String key, final long delta) {
+        }
+    }
+
     private static final class NullSummary implements Summary {
 
         private static final NullSummary INSTANCE = new NullSummary();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
index fc556a5..2818e15 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java
@@ -56,6 +56,7 @@ import org.apache.zookeeper.audit.AuditConstants;
 import org.apache.zookeeper.audit.AuditEvent.Result;
 import org.apache.zookeeper.audit.ZKAuditProvider;
 import org.apache.zookeeper.common.PathTrie;
+import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.StatPersisted;
@@ -1628,13 +1629,8 @@ public class DataTree {
         return aclCache;
     }
 
-    private String getTopNamespace(String path) {
-        String[] parts = path.split("/");
-        return parts.length > 1 ? parts[1] : null;
-    }
-
     private void updateReadStat(String path, long bytes) {
-        String namespace = getTopNamespace(path);
+        final String namespace = PathUtils.getTopNamespace(path);
         if (namespace == null) {
             return;
         }
@@ -1643,7 +1639,7 @@ public class DataTree {
     }
 
     private void updateWriteStat(String path, long bytes) {
-        String namespace = getTopNamespace(path);
+        final String namespace = PathUtils.getTopNamespace(path);
         if (namespace == null) {
             return;
         }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index 8ef5235..af15654 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server;
 
 import org.apache.zookeeper.metrics.Counter;
+import org.apache.zookeeper.metrics.CounterSet;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.metrics.MetricsContext.DetailLevel;
 import org.apache.zookeeper.metrics.MetricsProvider;
@@ -26,6 +27,7 @@ import org.apache.zookeeper.metrics.Summary;
 import org.apache.zookeeper.metrics.SummarySet;
 import org.apache.zookeeper.metrics.impl.DefaultMetricsProvider;
 import org.apache.zookeeper.metrics.impl.NullMetricsProvider;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -260,6 +262,8 @@ public final class ServerMetrics {
         WATCH_BYTES = metricsContext.getCounter("watch_bytes");
 
         JVM_PAUSE_TIME = metricsContext.getSummary("jvm_pause_time_ms", DetailLevel.ADVANCED);
+
+        QUOTA_EXCEEDED_ERROR_PER_NAMESPACE = metricsContext.getCounterSet(QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE);
     }
 
     /**
@@ -511,6 +515,8 @@ public final class ServerMetrics {
 
     public final Summary JVM_PAUSE_TIME;
 
+    public final CounterSet QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+
     private final MetricsProvider metricsProvider;
 
     public void resetAll() {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 51d5db6..86c13ae 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -52,6 +52,7 @@ import org.apache.zookeeper.Version;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.ZookeeperBanner;
+import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.common.StringUtils;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
@@ -82,6 +83,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.util.JvmPauseMonitor;
 import org.apache.zookeeper.server.util.OSMXBean;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
 import org.apache.zookeeper.server.util.RequestPathMetricsCollector;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.TxnDigest;
@@ -1913,6 +1915,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         rootContext.registerGauge("auth_failed_count", stats::getAuthFailedCount);
         rootContext.registerGauge("non_mtls_remote_conn_count", stats::getNonMTLSRemoteConnCount);
         rootContext.registerGauge("non_mtls_local_conn_count", stats::getNonMTLSLocalConnCount);
+
+        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE,
+                () -> QuotaMetricsUtils.getQuotaCountLimit(zkDb.getDataTree()));
+        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE,
+                () -> QuotaMetricsUtils.getQuotaBytesLimit(zkDb.getDataTree()));
+        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE,
+                () -> QuotaMetricsUtils.getQuotaCountUsage(zkDb.getDataTree()));
+        rootContext.registerGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE,
+                () -> QuotaMetricsUtils.getQuotaBytesUsage(zkDb.getDataTree()));
     }
 
     protected void unregisterMetrics() {
@@ -1952,7 +1963,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         rootContext.unregisterGauge("non_mtls_remote_conn_count");
         rootContext.unregisterGauge("non_mtls_local_conn_count");
 
-
+        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE);
+        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE);
+        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE);
+        rootContext.unregisterGaugeSet(QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE);
     }
 
     /**
@@ -2038,12 +2052,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             return;
         }
 
+        final String namespace = PathUtils.getTopNamespace(path);
         switch (type) {
             case OpCode.create:
-                checkQuota(lastPrefix, dataBytes, 1);
+                checkQuota(lastPrefix, dataBytes, 1, namespace);
                 break;
             case OpCode.setData:
-                checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0);
+                checkQuota(lastPrefix, dataBytes - (lastData == null ? 0 : lastData.length), 0, namespace);
                 break;
              default:
                  throw new IllegalArgumentException("Unsupported OpCode for checkQuota: " + type);
@@ -2059,8 +2074,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      *            the diff to be added to number of bytes
      * @param countDiff
      *            the diff to be added to the count
+     * @param namespace
+      *           the namespace for collecting quota exceeded errors
      */
-    private void checkQuota(String lastPrefix, long bytesDiff, long countDiff)
+    private void checkQuota(String lastPrefix, long bytesDiff, long countDiff, String namespace)
             throws KeeperException.QuotaExceededException {
         LOG.debug("checkQuota: lastPrefix={}, bytesDiff={}, countDiff={}", lastPrefix, bytesDiff, countDiff);
 
@@ -2108,6 +2125,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 String msg = "Quota exceeded: " + lastPrefix + " [current count=" + newCount + ", " + (isCountHardLimit ? "hard" : "soft") + "CountLimit=" + countLimit + "]";
                 RATE_LOGGER.rateLimitLog(msg);
                 if (isCountHardLimit) {
+                    updateQuotaExceededMetrics(namespace);
                     throw new KeeperException.QuotaExceededException(lastPrefix);
                 }
             }
@@ -2122,6 +2140,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 String msg = "Quota exceeded: " + lastPrefix + " [current bytes=" + newBytes + ", " + (isByteHardLimit ? "hard" : "soft") + "ByteLimit=" + byteLimit + "]";
                 RATE_LOGGER.rateLimitLog(msg);
                 if (isByteHardLimit) {
+                    updateQuotaExceededMetrics(namespace);
                     throw new KeeperException.QuotaExceededException(lastPrefix);
                 }
             }
@@ -2306,4 +2325,11 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return zkShutdownHandler;
     }
 
+    static void updateQuotaExceededMetrics(final String namespace) {
+        if (namespace == null) {
+            return;
+        }
+        ServerMetrics.getMetrics().QUOTA_EXCEEDED_ERROR_PER_NAMESPACE.add(namespace, 1);
+    }
 }
+
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/metric/SimpleCounterSet.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/metric/SimpleCounterSet.java
new file mode 100644
index 0000000..63166de
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/metric/SimpleCounterSet.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.metric;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.zookeeper.metrics.CounterSet;
+
+/**
+ * Represent a set of counters identified by different keys.
+ * The counter is thread-safe
+ */
+public class SimpleCounterSet extends Metric implements CounterSet {
+    private final String name;
+    private final ConcurrentHashMap<String, SimpleCounter> counters = new ConcurrentHashMap<>();
+
+    public SimpleCounterSet(final String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void add(final String key, final long delta) {
+        final SimpleCounter counter = counters.computeIfAbsent(key, (k) -> new SimpleCounter(k + "_" + name));
+        counter.add(delta);
+    }
+
+    @Override
+    public void reset() {
+        counters.values().forEach(SimpleCounter::reset);
+    }
+
+    @Override
+    public Map<String, Object> values() {
+        final Map<String, Object> m = new LinkedHashMap<>();
+        counters.values().forEach(counter -> m.putAll(counter.values()));
+        return m;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/QuotaMetricsUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/QuotaMetricsUtils.java
new file mode 100644
index 0000000..f989b34
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/util/QuotaMetricsUtils.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+
+public final class QuotaMetricsUtils {
+    public static final String QUOTA_COUNT_LIMIT_PER_NAMESPACE = "quota_count_limit_per_namespace";
+    public static final String QUOTA_BYTES_LIMIT_PER_NAMESPACE = "quota_bytes_limit_per_namespace";
+    public static final String QUOTA_COUNT_USAGE_PER_NAMESPACE = "quota_count_usage_per_namespace";
+    public static final String QUOTA_BYTES_USAGE_PER_NAMESPACE = "quota_bytes_usage_per_namespace";
+    public static final String QUOTA_EXCEEDED_ERROR_PER_NAMESPACE = "quota_exceeded_error_per_namespace";
+
+    enum QUOTA_LIMIT_USAGE_METRIC_TYPE {QUOTA_COUNT_LIMIT, QUOTA_BYTES_LIMIT, QUOTA_COUNT_USAGE, QUOTA_BYTES_USAGE}
+    static final String LIMIT_END_STRING = "/" + Quotas.limitNode;
+    static final String STATS_END_STRING = "/" + Quotas.statNode;
+
+    private QuotaMetricsUtils() {
+    }
+
+    /**
+     * Traverse the quota subtree and return per namespace quota count limit
+     *
+     * @param dataTree dataTree that contains the quota limit and usage data
+     * @return a map with top namespace as the key and quota count limit as the value
+     *
+     */
+    public static Map<String, Number> getQuotaCountLimit(final DataTree dataTree) {
+        return getQuotaLimitOrUsage(dataTree, QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_LIMIT);
+    }
+
+    /**
+     * Traverse the quota subtree and return per namespace quota bytes limit
+     *`
+     * @param dataTree dataTree that contains the quota limit and usage data
+     * @return a map with top namespace as the key and quota bytes limit as the value
+     *
+     */
+    public static Map<String, Number> getQuotaBytesLimit(final DataTree dataTree) {
+        return getQuotaLimitOrUsage(dataTree, QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT);
+    }
+
+    /**
+     * Traverse the quota subtree and return per namespace quota count usage
+     *
+     * @param dataTree dataTree that contains the quota limit and usage data
+     * @return a map with top namespace as the key and quota count usage as the value
+     *
+     */
+    public static Map<String, Number> getQuotaCountUsage(final DataTree dataTree) {
+        return getQuotaLimitOrUsage(dataTree, QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_USAGE);
+    }
+
+    /**
+     * Traverse the quota subtree and return per namespace quota bytes usage
+     *
+     * @param dataTree dataTree that contains the quota limit and usage data
+     * @return  a map with top namespace as the key and quota bytes usage as the value
+     *
+     */
+    public static Map<String, Number> getQuotaBytesUsage(final DataTree dataTree) {
+        return getQuotaLimitOrUsage(dataTree, QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE);
+    }
+
+    // traverse the quota subtree and read the quota limit or usage data
+    private static Map<String, Number> getQuotaLimitOrUsage(final DataTree dataTree,
+                                                            final QUOTA_LIMIT_USAGE_METRIC_TYPE type) {
+        final Map<String, Number> metricsMap = new ConcurrentHashMap<>();
+        if (dataTree != null) {
+            getQuotaLimitOrUsage(Quotas.quotaZookeeper, metricsMap, type, dataTree);
+        }
+        return metricsMap;
+    }
+
+    private static void getQuotaLimitOrUsage(final String path,
+                                     final Map<String, Number> metricsMap,
+                                     final QUOTA_LIMIT_USAGE_METRIC_TYPE type,
+                                     final DataTree dataTree) {
+        final DataNode node = dataTree.getNode(path);
+        if (node == null) {
+            return;
+        }
+        final String[] children;
+        synchronized (node) {
+            children = node.getChildren().toArray(new String[0]);
+        }
+        if (children.length == 0) {
+            if (shouldCollect(path, type)) {
+                collectQuotaLimitOrUsage(path, node, metricsMap, type);
+            }
+            return;
+        }
+        for (final String child : children) {
+            getQuotaLimitOrUsage(path + "/" + child, metricsMap, type, dataTree);
+        }
+    }
+
+    static boolean shouldCollect(final String path, final QUOTA_LIMIT_USAGE_METRIC_TYPE type) {
+        return path.endsWith(LIMIT_END_STRING)
+                && (QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_LIMIT == type || QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT == type)
+                || path.endsWith(STATS_END_STRING)
+                && (QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_USAGE == type || QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE == type);
+    }
+
+    static void collectQuotaLimitOrUsage(final String path,
+                                         final DataNode node,
+                                         final Map<String, Number> metricsMap,
+                                         final QUOTA_LIMIT_USAGE_METRIC_TYPE type) {
+        final String namespace = PathUtils.getTopNamespace(Quotas.trimQuotaPath(path));
+        if (namespace == null) {
+            return;
+        }
+        final byte[] data = node.getData();
+        if (data == null) {
+            return;
+        }
+        final StatsTrack statsTrack = new StatsTrack(data);
+        switch (type) {
+            case QUOTA_COUNT_LIMIT:
+                aggregateQuotaLimitOrUsage(namespace, metricsMap, getQuotaLimit(statsTrack.getCountHardLimit(), statsTrack.getCount()));
+                break;
+            case QUOTA_BYTES_LIMIT:
+                aggregateQuotaLimitOrUsage(namespace, metricsMap, getQuotaLimit(statsTrack.getByteHardLimit(), statsTrack.getBytes()));
+                break;
+            case QUOTA_COUNT_USAGE:
+                aggregateQuotaLimitOrUsage(namespace, metricsMap, statsTrack.getCount());
+                break;
+            case QUOTA_BYTES_USAGE:
+                aggregateQuotaLimitOrUsage(namespace, metricsMap, statsTrack.getBytes());
+                break;
+            default:
+        }
+    }
+
+    // hard limit takes precedence if specified
+    static long getQuotaLimit(final long hardLimit, final long limit) {
+        return hardLimit > -1 ? hardLimit : limit;
+    }
+
+    private static void aggregateQuotaLimitOrUsage(final String namespace,
+                                           final Map<String, Number> metricsMap,
+                                           final long limitOrUsage) {
+        metricsMap.put(namespace, metricsMap.getOrDefault(namespace, 0).longValue() + limitOrUsage);
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/PathUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/PathUtilsTest.java
index 25bb55d..fb4f4bd 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/common/PathUtilsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/PathUtilsTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.common;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import org.apache.zookeeper.ZKTestCase;
 import org.junit.jupiter.api.Test;
@@ -156,4 +158,11 @@ public class PathUtilsTest extends ZKTestCase {
         });
     }
 
+    @Test
+    public void testGetTopNamespace() {
+        assertEquals("n0", PathUtils.getTopNamespace("/n0/n1/n2/n3"));
+        assertNull(PathUtils.getTopNamespace("/"));
+        assertNull(PathUtils.getTopNamespace(""));
+        assertNull(PathUtils.getTopNamespace(null));
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
index 143938b..bd14120 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server;
 
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -27,11 +28,15 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.Test;
 
@@ -164,4 +169,25 @@ public class ZooKeeperServerTest extends ZKTestCase {
         assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
     }
 
+    @Test
+    public void testUpdateQuotaExceededMetrics() {
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+        final String namespace = UUID.randomUUID().toString();
+        final long count = 3L;
+
+        for (int i = 0; i < count; i++) {
+            ZooKeeperServer.updateQuotaExceededMetrics(namespace);
+        }
+
+        final Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        assertEquals(1, values.keySet().stream().filter(
+                key -> key.contains(String.format("%s_%s", namespace, name))).count());
+
+        assertEquals(count, values.get(String.format("%s_%s", namespace, name)));
+    }
+
+    @Test
+    public void testUpdateQuotaExceededMetrics_nullNamespace() {
+        assertDoesNotThrow(() -> ZooKeeperServer.updateQuotaExceededMetrics(null));
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/metric/SimpleCounterSetTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/metric/SimpleCounterSetTest.java
new file mode 100644
index 0000000..3260475
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/metric/SimpleCounterSetTest.java
@@ -0,0 +1,61 @@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.metric;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.util.Map;
+import org.apache.zookeeper.ZKTestCase;
+import org.junit.jupiter.api.Test;
+
+
+public class SimpleCounterSetTest extends ZKTestCase {
+    @Test
+    public void testValues() {
+        final SimpleCounterSet simpleCounterSet = createSimpleCounterSetAddData("test1");
+        final Map<String, Object> values = simpleCounterSet.values();
+
+        assertEquals(2, values.size());
+        assertEquals(30L , values.get("key1_test1"));
+        assertEquals(70L , values.get("key2_test1"));
+    }
+
+    @Test
+    public void testReset() {
+        final SimpleCounterSet simpleCounterSet = createSimpleCounterSetAddData("test2");
+        simpleCounterSet.reset();
+
+        final Map<String, Object> values = simpleCounterSet.values();
+
+        assertEquals(2, values.size());
+        assertEquals(0L , values.get("key1_test2"));
+        assertEquals(0L , values.get("key2_test2"));
+    }
+
+    private SimpleCounterSet createSimpleCounterSetAddData(final String name) {
+        final SimpleCounterSet simpleCounterSet = new SimpleCounterSet(name);
+
+        simpleCounterSet.add("key1", 10);
+        simpleCounterSet.add("key1", 20);
+
+        simpleCounterSet.add("key2", 30);
+        simpleCounterSet.add("key2", 40);
+
+        return simpleCounterSet;
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/QuotaMetricsUtilsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/QuotaMetricsUtilsTest.java
new file mode 100644
index 0000000..5e5f979
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/util/QuotaMetricsUtilsTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Quotas;
+import org.apache.zookeeper.StatsTrack;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.metrics.MetricsContext;
+import org.apache.zookeeper.metrics.MetricsProvider;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.DataNode;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.junit.jupiter.api.Test;
+
+public class QuotaMetricsUtilsTest extends ZKTestCase {
+    @Test
+    public void testQuotaMetrics_singleQuotaSubtree() throws Exception {
+        // register the metrics
+        final String nameSuffix = UUID.randomUUID().toString();
+        final DataTree dt = new DataTree();
+        registerQuotaMetrics(nameSuffix, dt);
+
+        // build the data tree
+        final String ns = UUID.randomUUID().toString();
+
+        final long countLimit = 10;
+        final long bytesLimit = 100;
+        final long countHardLimit = 5;
+        final long bytesHardLimit = 50;
+
+        final long countUsage = 5;
+        final long bytesUsage = 40;
+
+        final StatsTrack limitTrack = buildLimitStatsTrack(countLimit, bytesLimit, countHardLimit, bytesHardLimit);
+        final StatsTrack usageTrack = buildUsageStatsTrack(countUsage, bytesUsage);
+        buildDataTree("/" + ns, limitTrack, usageTrack, dt);
+
+        // validate the quota metrics
+        validateQuotaMetrics(ns, countHardLimit, bytesHardLimit, countUsage, bytesUsage, nameSuffix);
+    }
+
+
+    @Test
+    public void testQuotaMetrics_multipleQuotaSubtrees() throws Exception {
+        // register the metrics
+        final String nameSuffix = UUID.randomUUID().toString();
+        final DataTree dt = new DataTree();
+        registerQuotaMetrics(nameSuffix, dt);
+
+        // build the data tree
+        final String ns = UUID.randomUUID().toString();
+
+        final long countLimit1 = 10;
+        final long bytesLimit1 = 100;
+        final long countHardLimit1 = 5;
+        final long bytesHardLimit1 = 50;
+
+        final long countUsage1 = 5;
+        final long bytesUsage1 = 40;
+
+        final StatsTrack limitTrack1 = buildLimitStatsTrack(countLimit1, bytesLimit1, countHardLimit1, bytesHardLimit1);
+        final StatsTrack usageTrack1 = buildUsageStatsTrack(countUsage1, bytesUsage1);
+
+        buildDataTree("/" + ns + "/a/b", limitTrack1, usageTrack1, dt);
+
+        // validate the quota metrics
+        validateQuotaMetrics(ns, countHardLimit1, bytesHardLimit1, countUsage1, bytesUsage1, nameSuffix);
+
+        // update the data tree with another quota subtree
+        final long countLimit2 = 20;
+        final long bytesLimit2 = 200;
+        final long countHardLimit2 = 10;
+        final long bytesHardLimit2 = 100;
+
+        final long countUsage2 = 9;
+        final long bytesUsage2 = 80;
+
+        final StatsTrack limitTrack2 = buildLimitStatsTrack(countLimit2, bytesLimit2, countHardLimit2, bytesHardLimit2);
+        final StatsTrack usageTrack2 = buildUsageStatsTrack(countUsage2, bytesUsage2);
+
+        buildDataTree("/" + ns + "/a/c/d", limitTrack2, usageTrack2, dt);
+
+        // validate the quota metrics
+        validateQuotaMetrics(ns, countHardLimit1 + countHardLimit2, bytesHardLimit1 + bytesHardLimit2,
+                countUsage1 + countUsage2, bytesUsage1 + bytesUsage2, nameSuffix);
+    }
+
+    @Test
+    public void testQuotaMetrics_noUsage() throws Exception {
+        // register the metrics
+        final String nameSuffix = UUID.randomUUID().toString();
+        final DataTree dt = new DataTree();
+        registerQuotaMetrics(nameSuffix, dt);
+
+        // build the data tree
+        final String ns = UUID.randomUUID().toString();
+
+        final long countLimit = 20;
+        final long bytesLimit = 200;
+        final long countHardLimit = -1;
+        final long bytesHardLimit = -1;
+
+        final long countUsage = 1;  // the node itself is always counted
+        final long bytesUsage = 0;
+
+        final StatsTrack limitTrack = buildLimitStatsTrack(countLimit, bytesLimit, countHardLimit, bytesHardLimit);
+        final StatsTrack usageTrack = buildUsageStatsTrack(countUsage, bytesUsage);
+        buildDataTree("/" + ns, limitTrack, usageTrack, dt);
+
+        // validate the quota
+        validateQuotaMetrics(ns, countLimit, bytesLimit, countUsage, bytesUsage, nameSuffix);
+    }
+
+    @Test
+    public void testQuotaMetrics_nullDataTree() {
+        // register the metrics
+        final String nameSuffix = UUID.randomUUID().toString();
+        registerQuotaMetrics(nameSuffix, null);
+
+        // validate the quota
+        validateQuotaMetrics(UUID.randomUUID().toString(), null, null, null, null, nameSuffix);
+    }
+
+    @Test
+    public void testQuotaMetrics_emptyDataTree() {
+        // register the metrics
+        final String nameSuffix = UUID.randomUUID().toString();
+        registerQuotaMetrics(nameSuffix, new DataTree());
+
+        // validate the quota
+        validateQuotaMetrics(UUID.randomUUID().toString(), null, null, null, null, nameSuffix);
+    }
+
+    @Test
+    public void testShouldCollect_limitPath() {
+        final String limitPath = Quotas.quotaPath("/ns1") + QuotaMetricsUtils.LIMIT_END_STRING;
+
+        assertTrue(QuotaMetricsUtils.shouldCollect(limitPath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_LIMIT));
+        assertTrue(QuotaMetricsUtils.shouldCollect(limitPath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT));
+
+        assertFalse(QuotaMetricsUtils.shouldCollect(limitPath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_USAGE));
+        assertFalse(QuotaMetricsUtils.shouldCollect(limitPath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE));
+    }
+
+    @Test
+    public void testShouldCollect_usagePath() {
+        final String usagePath = Quotas.quotaPath("/ns1") + QuotaMetricsUtils.STATS_END_STRING;
+
+        assertTrue(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_USAGE));
+        assertTrue(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE));
+
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_LIMIT));
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT));
+    }
+
+    @Test
+    public void testShouldCollect_notLimitOrUsagePath() {
+        final String usagePath = Quotas.quotaPath("/ns1") + "/notLimitOrUsage";
+
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_USAGE));
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE));
+
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_COUNT_LIMIT));
+        assertFalse(QuotaMetricsUtils.shouldCollect(usagePath, QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT));
+    }
+
+    @Test
+    public void testGetQuotaLimit() {
+        assertEquals(0L, QuotaMetricsUtils.getQuotaLimit(0L, -1L));
+        assertEquals(1L, QuotaMetricsUtils.getQuotaLimit(-1L, 1L));
+        assertEquals(0L, QuotaMetricsUtils.getQuotaLimit(-2L, 0L));
+    }
+
+    @Test
+    public void testCollectQuotaMetrics_noData() {
+        final Map<String, Number> metricsMap = new HashMap<>();
+
+        QuotaMetricsUtils.collectQuotaLimitOrUsage(Quotas.quotaPath("/ns1") + QuotaMetricsUtils.LIMIT_END_STRING,
+                                        new DataNode(new byte[0], null, null),
+                                        metricsMap,
+                                        QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT);
+
+        assertEquals(1, metricsMap.size());
+        final Map.Entry<String, Number> entry = metricsMap.entrySet().iterator().next();
+        assertEquals("ns1", entry.getKey());
+        assertEquals(-1L,  entry.getValue().longValue());
+    }
+
+    @Test
+    public void testCollectQuotaMetrics_nullData() {
+        final Map<String, Number> metricsMap = new HashMap<>();
+
+        QuotaMetricsUtils.collectQuotaLimitOrUsage(Quotas.quotaPath("/ns1") + QuotaMetricsUtils.LIMIT_END_STRING,
+                new DataNode(null, null, null),
+                metricsMap,
+                QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_LIMIT);
+
+        assertEquals(0, metricsMap.size());
+    }
+
+    @Test
+    public void testCollectQuotaMetrics_noNamespace() {
+        final Map<String, Number> metricsMap = new HashMap<>();
+
+        QuotaMetricsUtils.collectQuotaLimitOrUsage("/zookeeper/quota",
+                new DataNode(null, null, null),
+                metricsMap,
+                QuotaMetricsUtils.QUOTA_LIMIT_USAGE_METRIC_TYPE.QUOTA_BYTES_USAGE);
+
+        assertEquals(0, metricsMap.size());
+    }
+
+    private void registerQuotaMetrics(final String nameSuffix, final DataTree dt) {
+        final MetricsProvider metricProvider = ServerMetrics.getMetrics().getMetricsProvider();
+        final MetricsContext rootContext = metricProvider.getRootContext();
+
+        // added random UUID as NAME_SUFFIX to avoid GaugeSet being overwritten when registering with same name
+        rootContext.registerGaugeSet(
+                QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE + nameSuffix, () -> QuotaMetricsUtils.getQuotaCountLimit(dt));
+        rootContext.registerGaugeSet(
+                QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE + nameSuffix, () -> QuotaMetricsUtils.getQuotaBytesLimit(dt));
+        rootContext.registerGaugeSet(
+                QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE + nameSuffix, () -> QuotaMetricsUtils.getQuotaCountUsage(dt));
+        rootContext.registerGaugeSet(
+                QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE + nameSuffix, () -> QuotaMetricsUtils.getQuotaBytesUsage(dt));
+    }
+
+    private StatsTrack buildLimitStatsTrack(final long countLimit,
+                                            final long bytesLimit,
+                                            final long countHardLimit,
+                                            final long bytesHardLimit) {
+        final StatsTrack limitTrack = new StatsTrack();
+        limitTrack.setCount(countLimit);
+        limitTrack.setBytes(bytesLimit);
+        limitTrack.setCountHardLimit(countHardLimit);
+        limitTrack.setByteHardLimit(bytesHardLimit);
+        return limitTrack;
+    }
+
+    private StatsTrack buildUsageStatsTrack(final long countUsage,
+                                            final long bytesUsage) {
+        final StatsTrack usageTrack = new StatsTrack();
+        usageTrack.setCount(countUsage);
+        usageTrack.setBytes(bytesUsage);
+
+        return usageTrack;
+    }
+
+    private void buildDataTree(final String path,
+                               final StatsTrack limitTrack,
+                               final StatsTrack usageTrack,
+                               final DataTree dataTree) throws Exception {
+
+        // create the ancestor and child data nodes
+        buildAncestors(path, dataTree);
+        int childCount = (int) usageTrack.getCount() - 1; // the node count always includes the top namespace itself
+        if (childCount > 0) {
+            int dataBytes = (int) usageTrack.getBytes() / childCount;
+            for (int i = 0; i < childCount; i++) {
+                dataTree.createNode(path + "/n_" + i, new byte[dataBytes], null, -1, 1, 1, 1);
+            }
+        }
+
+        // create the quota tree
+        buildAncestors(Quotas.quotaPath(path), dataTree);
+
+        final String limitPath = Quotas.limitPath(path);
+        dataTree.createNode(limitPath, limitTrack.getStatsBytes(), null, -1, 1, 1, 1);
+        assertEquals(limitTrack, new StatsTrack(dataTree.getNode(limitPath).getData()));
+
+        final String usagePath = Quotas.statPath(path);
+        dataTree.createNode(usagePath, usageTrack.getStatsBytes(), null, -1, 1, 1, 1);
+        assertEquals(usageTrack, new StatsTrack(dataTree.getNode(usagePath).getData()));
+    }
+
+    private void buildAncestors(final String path, final DataTree dataTree) throws Exception {
+        final String[] parts = path.split("/");
+        String nodePath = "";
+
+        for (int i = 1; i < parts.length; i++) {
+            nodePath = nodePath + "/" + parts[i];
+            try {
+                dataTree.createNode(nodePath, null, null, -1, 1, 1, 1);
+            } catch (final KeeperException.NodeExistsException e) {
+                // ignored
+            }
+        }
+    }
+
+    private void validateQuotaMetrics(final String namespace,
+                                      final Long countLimit,
+                                      final Long bytesLimit,
+                                      final Long countUsage,
+                                      final Long bytesUsage,
+                                      final String nameSuffix) {
+        final Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        assertEquals(countLimit, values.get(namespace + "_" + QuotaMetricsUtils.QUOTA_COUNT_LIMIT_PER_NAMESPACE + nameSuffix));
+        assertEquals(bytesLimit, values.get(namespace + "_" + QuotaMetricsUtils.QUOTA_BYTES_LIMIT_PER_NAMESPACE + nameSuffix));
+        assertEquals(countUsage, values.get(namespace + "_" + QuotaMetricsUtils.QUOTA_COUNT_USAGE_PER_NAMESPACE + nameSuffix));
+        assertEquals(bytesUsage, values.get(namespace + "_" + QuotaMetricsUtils.QUOTA_BYTES_USAGE_PER_NAMESPACE + nameSuffix));
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java
index 470ce39..c489618 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/EnforceQuotaTest.java
@@ -19,6 +19,7 @@
 package org.apache.zookeeper.test;
 
 import static org.junit.Assert.fail;
+import java.util.UUID;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.StatsTrack;
@@ -54,7 +55,8 @@ public class EnforceQuotaTest extends ClientBase {
 
     @Test
     public void testSetQuotaDisableWhenExceedBytesHardQuota() throws Exception {
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "12345".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         StatsTrack st = new StatsTrack();
         st.setByteHardLimit(5L);
@@ -62,6 +64,7 @@ public class EnforceQuotaTest extends ClientBase {
 
         try {
             zk.setData(path, "123456".getBytes(), -1);
+            ZooKeeperQuotaTest.validateNoQuotaExceededMetrics(namespace);
         } catch (KeeperException.QuotaExceededException e) {
             fail("should not throw Byte Quota Exceeded Exception when enforce quota disables");
         }
@@ -70,7 +73,8 @@ public class EnforceQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaDisableWhenExceedCountHardQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int count = 2;
         StatsTrack st = new StatsTrack();
@@ -80,6 +84,7 @@ public class EnforceQuotaTest extends ClientBase {
 
         try {
             zk.create(path + "/c2" + "/c3", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            ZooKeeperQuotaTest.validateNoQuotaExceededMetrics(namespace);
         } catch (KeeperException.QuotaExceededException e) {
             fail("should not throw Count Quota Exceeded Exception when enforce quota disables");
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
index aacd59e..bbc81c7 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ZooKeeperQuotaTest.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.QuotaExceededException;
@@ -40,7 +42,9 @@ import org.apache.zookeeper.cli.ListQuotaCommand;
 import org.apache.zookeeper.cli.MalformedPathException;
 import org.apache.zookeeper.cli.SetQuotaCommand;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.util.QuotaMetricsUtils;
 import org.apache.zookeeper.test.StatsTrackTest.OldStatsTrack;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -188,7 +192,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaWhenExceedBytesSoftQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
         StatsTrack st = new StatsTrack();
@@ -199,6 +204,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
 
         try {
             zk.setData(path, "123456".getBytes(), -1);
+            validateNoQuotaExceededMetrics(namespace);
         } catch (Exception e) {
             fail("should set data which exceeds the soft byte quota");
         }
@@ -207,7 +213,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaWhenExceedBytesHardQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE,
                 CreateMode.PERSISTENT);
         StatsTrack st = new StatsTrack();
@@ -219,13 +226,15 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set data which exceeds the hard byte quota");
         } catch (QuotaExceededException e) {
            //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenExceedBytesHardQuotaExtend() throws Exception {
 
-        String path = "/c0";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int bytes = 100;
         StatsTrack st = new StatsTrack();
@@ -240,6 +249,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
                     fail("should not set quota when exceeds hard bytes quota");
                 } catch (QuotaExceededException e) {
                     //expected
+                    validateQuotaExceededMetrics(namespace);
                 }
             } else {
                 zk.create(sb.toString(), "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -250,7 +260,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaWhenSetQuotaLessThanExistBytes() throws Exception {
 
-        String path = "/c0";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "123456789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int bytes = 5;
         StatsTrack st = new StatsTrack();
@@ -261,50 +272,56 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set quota when exceeds hard bytes quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenSetChildDataExceedBytesQuota() throws Exception {
 
-        final String path = "/test/quota";
-        zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/test/quota", "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/test/quota/data", "56789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace + "/quota";
+        zk.create("/" + namespace, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create(path, "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create(path + "/data", "56789".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         StatsTrack quota = new StatsTrack();
         quota.setByteHardLimit(10);
         SetQuotaCommand.createQuota(zk, path, quota);
         try {
-            zk.setData("/test/quota/data", "567891".getBytes(), -1);
+            zk.setData(path + "/data", "567891".getBytes(), -1);
             fail("should not set data when exceed hard byte quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenCreateNodeExceedBytesQuota() throws Exception {
 
-        final String path = "/test/quota";
-        zk.create("/test", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-        zk.create("/test/quota", "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace + "/quota";
+        zk.create("/" + namespace, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.create(path, "01234".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
 
         StatsTrack quota = new StatsTrack();
         quota.setByteHardLimit(10);
         SetQuotaCommand.createQuota(zk, path, quota);
         try {
-            zk.create("/test/quota/data", "567891".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create(path + "/data", "567891".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             fail("should not set data when exceed hard byte quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenExceedCountSoftQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int count = 2;
         StatsTrack st = new StatsTrack();
@@ -314,6 +331,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
 
         try {
             zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            validateNoQuotaExceededMetrics(namespace);
         } catch (QuotaExceededException e) {
             fail("should set quota when exceeds soft count quota");
         }
@@ -322,7 +340,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaWhenExceedCountHardQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int count = 2;
         StatsTrack st = new StatsTrack();
@@ -335,13 +354,15 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set quota when exceeds hard count quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenExceedCountHardQuotaExtend() throws Exception {
 
-        String path = "/c0";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int count = 100;
         StatsTrack st = new StatsTrack();
@@ -356,6 +377,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
                     fail("should not set quota when exceeds hard count quota");
                 } catch (QuotaExceededException e) {
                     //expected
+                    validateQuotaExceededMetrics(namespace);
                 }
             } else {
                 zk.create(sb.toString(), "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -366,7 +388,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testSetQuotaWhenSetQuotaLessThanExistCount() throws Exception {
 
-        String path = "/c0";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk.create(path + "/c1", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         zk.create(path + "/c2", "1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -379,13 +402,15 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set quota when exceeds hard count quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
     @Test
     public void testSetQuotaWhenExceedBothBytesAndCountHardQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         StatsTrack st = new StatsTrack();
         st.setByteHardLimit(5L);
@@ -397,6 +422,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should give priority to CountQuotaExceededException when both meets the count and bytes quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
     }
 
@@ -450,7 +476,8 @@ public class ZooKeeperQuotaTest extends ClientBase {
     @Test
     public void testDeleteBytesQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "12345".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         StatsTrack st = new StatsTrack();
         st.setByteHardLimit(5L);
@@ -461,6 +488,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set data which exceeds the hard byte quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
 
         //delete the Byte Hard Quota
@@ -469,12 +497,14 @@ public class ZooKeeperQuotaTest extends ClientBase {
         DelQuotaCommand.delQuota(zk, path, st);
 
         zk.setData(path, "123456".getBytes(), -1);
+        validateQuotaExceededMetrics(namespace);
     }
 
     @Test
     public void testDeleteCountQuota() throws Exception {
 
-        final String path = "/c1";
+        final String namespace = UUID.randomUUID().toString();
+        final String path = "/" + namespace;
         zk.create(path, "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         int count = 2;
         StatsTrack st = new StatsTrack();
@@ -487,6 +517,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
             fail("should not set quota when exceeds hard count quota");
         } catch (QuotaExceededException e) {
             //expected
+            validateQuotaExceededMetrics(namespace);
         }
 
         //delete the Count Hard Quota
@@ -495,6 +526,7 @@ public class ZooKeeperQuotaTest extends ClientBase {
         DelQuotaCommand.delQuota(zk, path, st);
 
         zk.create(path + "/c2" + "/c3", "data".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        validateQuotaExceededMetrics(namespace);
     }
 
     @Test
@@ -552,4 +584,20 @@ public class ZooKeeperQuotaTest extends ClientBase {
             }
         }
     }
+
+    private void validateQuotaExceededMetrics(final String namespace) {
+        final String name = QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE;
+        final Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+
+        assertEquals(1, metrics.keySet().stream().filter(
+                key -> key.contains(String.format("%s_%s", namespace, name))).count());
+
+        assertEquals(1L, metrics.get(String.format("%s_%s", namespace, name)));
+    }
+
+    static void validateNoQuotaExceededMetrics(final String namespace) {
+        final Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+        assertEquals(0, metrics.keySet().stream().filter(
+                key -> key.contains(String.format("%s_%s", namespace, QuotaMetricsUtils.QUOTA_EXCEEDED_ERROR_PER_NAMESPACE))).count());
+    }
 }