You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/20 11:01:49 UTC

[pulsar] branch master updated: [monitoring][broker][metadata] add metadata store metrics (#17041)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 37f0a2da9b0 [monitoring][broker][metadata] add metadata store metrics (#17041)
37f0a2da9b0 is described below

commit 37f0a2da9b0749313c6dbadd779377a51a34f0de
Author: Tao Jiuming <95...@users.noreply.github.com>
AuthorDate: Tue Sep 20 19:01:41 2022 +0800

    [monitoring][broker][metadata] add metadata store metrics (#17041)
---
 .../rackawareness/BookieRackAffinityMapping.java   |   1 +
 .../pulsar/broker/resources/PulsarResources.java   |  12 +-
 .../apache/pulsar/PulsarClusterMetadataSetup.java  |  19 ++-
 .../pulsar/PulsarClusterMetadataTeardown.java      |   8 +-
 .../apache/pulsar/PulsarInitialNamespaceSetup.java |   2 +-
 .../PulsarTransactionCoordinatorMetadataSetup.java |   2 +-
 .../org/apache/pulsar/broker/PulsarService.java    |   2 +
 .../apache/pulsar/compaction/CompactorTool.java    |   1 +
 .../broker/auth/MockedPulsarServiceBaseTest.java   |  14 ++-
 .../broker/stats/MetadataStoreStatsTest.java       | 140 +++++++++++++++++++++
 .../broker/zookeeper/ClusterMetadataSetupTest.java |   4 +-
 .../websocket/proxy/ProxyAuthenticationTest.java   |   2 +-
 .../websocket/proxy/ProxyAuthorizationTest.java    |   2 +-
 .../websocket/proxy/ProxyConfigurationTest.java    |   2 +-
 .../proxy/ProxyEncryptionPublishConsumeTest.java   |   2 +-
 .../websocket/proxy/ProxyPublishConsumeTest.java   |   2 +-
 .../proxy/ProxyPublishConsumeTlsTest.java          |   2 +-
 .../proxy/ProxyPublishConsumeWithoutZKTest.java    |   2 +-
 .../proxy/v1/V1_ProxyAuthenticationTest.java       |   2 +-
 .../PulsarMetadataStateStoreProviderImpl.java      |   3 +-
 .../org/apache/pulsar/functions/worker/Worker.java |   2 +-
 pulsar-metadata/pom.xml                            |   5 +
 .../pulsar/metadata/api/MetadataStoreConfig.java   |   9 ++
 .../bookkeeper/AbstractMetadataDriver.java         |   1 +
 .../pulsar/metadata/bookkeeper/BKCluster.java      |   3 +-
 .../metadata/impl/AbstractMetadataStore.java       |  71 +++++++++--
 .../metadata/impl/LocalMemoryMetadataStore.java    |   1 +
 .../pulsar/metadata/impl/RocksdbMetadataStore.java |   1 +
 .../batching/AbstractBatchedMetadataStore.java     |   2 +-
 .../metadata/impl/stats/MetadataStoreStats.java    | 110 ++++++++++++++++
 .../pulsar/metadata/impl/stats/package-info.java   |  19 +++
 .../apache/pulsar/proxy/server/ProxyService.java   |   4 +-
 .../pulsar/sql/presto/PulsarConnectorCache.java    |   2 +-
 .../pulsar/testclient/ManagedLedgerWriter.java     |   2 +-
 .../apache/pulsar/websocket/WebSocketService.java  |   6 +-
 35 files changed, 417 insertions(+), 45 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
index c0c29637114..dab42558d01 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMapping.java
@@ -98,6 +98,7 @@ public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
                 int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
                 store = MetadataStoreExtended.create(url,
                         MetadataStoreConfig.builder()
+                                .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                                 .sessionTimeoutMillis(zkTimeout)
                                 .build());
             } catch (MetadataStoreException e) {
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
index a087d8090d3..9cfeddd123d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java
@@ -88,9 +88,17 @@ public class PulsarResources {
         this.configurationMetadataStore = Optional.ofNullable(configurationMetadataStore);
     }
 
-    public static MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+    public static MetadataStoreExtended createLocalMetadataStore(String serverUrls, int sessionTimeoutMs)
             throws MetadataStoreException {
         return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
-                .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false).build());
+                .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
+                .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
+    }
+
+    public static MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
+            throws MetadataStoreException {
+        return MetadataStoreExtended.create(serverUrls, MetadataStoreConfig.builder()
+                .sessionTimeoutMillis(sessionTimeoutMs).allowReadOnlyOperations(false)
+                .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
     }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index a06378d20ff..a95e6121b3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -248,8 +248,8 @@ public class PulsarClusterMetadataSetup {
                 arguments.metadataStoreUrl, arguments.configurationMetadataStore);
 
         MetadataStoreExtended localStore =
-                initMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
-        MetadataStoreExtended configStore = initMetadataStore(arguments.configurationMetadataStore,
+                initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
+        MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
                 arguments.zkSessionTimeoutMillis);
 
         final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
@@ -389,9 +389,22 @@ public class PulsarClusterMetadataSetup {
         }
     }
 
-    public static MetadataStoreExtended initMetadataStore(String connection, int sessionTimeout) throws Exception {
+    public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
         MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
                 .sessionTimeoutMillis(sessionTimeout)
+                .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+                .build());
+        if (store instanceof MetadataStoreLifecycle) {
+            ((MetadataStoreLifecycle) store).initializeCluster().get();
+        }
+        return store;
+    }
+
+    public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
+            throws Exception {
+        MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
+                .sessionTimeoutMillis(sessionTimeout)
+                .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
                 .build());
         if (store instanceof MetadataStoreLifecycle) {
             ((MetadataStoreLifecycle) store).initializeCluster().get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
index d0da6a5a2bd..d0ba5c47f75 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataTeardown.java
@@ -99,7 +99,10 @@ public class PulsarClusterMetadataTeardown {
 
         @Cleanup
         MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.zookeeper,
-                MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
+                MetadataStoreConfig.builder()
+                        .sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
+                        .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+                        .build());
 
         if (arguments.bkMetadataServiceUri != null) {
             @Cleanup
@@ -121,7 +124,8 @@ public class PulsarClusterMetadataTeardown {
             // Should it be done by REST API before broker is down?
             @Cleanup
             MetadataStore configMetadataStore = MetadataStoreFactory.create(arguments.configurationStore,
-                    MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis).build());
+                    MetadataStoreConfig.builder().sessionTimeoutMillis(arguments.zkSessionTimeoutMillis)
+                            .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
             deleteRecursively(configMetadataStore, "/admin/clusters/" + arguments.cluster).join();
         }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
index 15e4792fd97..bb27be60b35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarInitialNamespaceSetup.java
@@ -83,7 +83,7 @@ public class PulsarInitialNamespaceSetup {
         }
 
         try (MetadataStore configStore = PulsarClusterMetadataSetup
-                .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
+                .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
             PulsarResources pulsarResources = new PulsarResources(null, configStore);
             for (String namespace : arguments.namespaces) {
                 NamespaceName namespaceName = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
index 66607dd4c0a..b1fae8753ec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarTransactionCoordinatorMetadataSetup.java
@@ -91,7 +91,7 @@ public class PulsarTransactionCoordinatorMetadataSetup {
         }
 
         try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
-                .initMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
+                .initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
             PulsarResources pulsarResources = new PulsarResources(null, configStore);
             // Create system tenant
             PulsarClusterMetadataSetup
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 750a56e0c86..c69c6e2418f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -362,6 +362,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                         .batchingMaxDelayMillis(config.getMetadataStoreBatchingMaxDelayMillis())
                         .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
                         .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
+                        .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
                         .synchronizer(synchronizer)
                         .build());
     }
@@ -1045,6 +1046,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
                         .batchingMaxOperations(config.getMetadataStoreBatchingMaxOperations())
                         .batchingMaxSizeKb(config.getMetadataStoreBatchingMaxSizeKb())
                         .synchronizer(synchronizer)
+                        .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .build());
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
index a54135de9a8..4cb1fd347df 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java
@@ -143,6 +143,7 @@ public class CompactorTool {
         MetadataStoreExtended store = MetadataStoreExtended.create(brokerConfig.getMetadataStoreUrl(),
                 MetadataStoreConfig.builder()
                         .sessionTimeoutMillis((int) brokerConfig.getMetadataStoreSessionTimeoutMillis())
+                        .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                         .build());
 
         @Cleanup
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index ba1b4885d76..f70b3b43ab9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -381,21 +381,27 @@ public abstract class MockedPulsarServiceBaseTest extends TestRetrySupport {
     }
 
     protected MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
-        return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+        return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
+                .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
+                .synchronizer(synchronizer).build());
     }
 
     protected MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
-        return new ZKMetadataStore(mockZooKeeper);
+        return new ZKMetadataStore(mockZooKeeper, MetadataStoreConfig.builder()
+                .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
     }
 
     protected MetadataStoreExtended createConfigurationMetadataStore(PulsarMetadataEventSynchronizer synchronizer) {
         return new ZKMetadataStore(mockZooKeeperGlobal,
-                MetadataStoreConfig.builder().synchronizer(synchronizer).build());
+                MetadataStoreConfig.builder()
+                        .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                        .synchronizer(synchronizer).build());
 
     }
 
     protected MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
-        return new ZKMetadataStore(mockZooKeeperGlobal);
+        return new ZKMetadataStore(mockZooKeeperGlobal, MetadataStoreConfig.builder()
+                .metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE).build());
     }
 
     private void mockConfigBrokerInterceptors(PulsarService pulsarService) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
new file mode 100644
index 00000000000..eba134a2c8d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/MetadataStoreStatsTest.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import com.google.common.collect.Multimap;
+import java.io.ByteArrayOutputStream;
+import java.util.Collection;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.metadata.api.MetadataStoreConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+@Test(groups = "broker")
+public class MetadataStoreStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(false);
+        conf.setSystemTopicEnabled(false);
+        super.baseSetup();
+        AuthenticationProviderToken.resetMetrics();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+        resetConfig();
+    }
+
+    @Test
+    public void testMetadataStoreStats() throws Exception {
+        String ns = "prop/ns-abc1";
+        admin.namespaces().createNamespace(ns);
+
+        String topic = "persistent://prop/ns-abc1/metadata-store-" + UUID.randomUUID();
+        String subName = "my-sub1";
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic).create();
+        @Cleanup
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic).subscriptionName(subName).subscribe();
+
+        for (int i = 0; i < 100; i++) {
+            producer.newMessage().value(UUID.randomUUID().toString()).send();
+        }
+
+        for (;;) {
+            Message<String> message = consumer.receive(10, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            consumer.acknowledge(message);
+        }
+
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, false, false, output);
+        String metricsStr = output.toString();
+        Multimap<String, PrometheusMetricsTest.Metric> metricsMap = PrometheusMetricsTest.parseMetrics(metricsStr);
+
+        Collection<PrometheusMetricsTest.Metric> opsLatency = metricsMap.get("pulsar_metadata_store_ops_latency_ms" + "_sum");
+        Collection<PrometheusMetricsTest.Metric> putBytes = metricsMap.get("pulsar_metadata_store_put_bytes" + "_total");
+
+        Assert.assertTrue(opsLatency.size() > 1);
+        Assert.assertTrue(putBytes.size() > 1);
+
+        for (PrometheusMetricsTest.Metric m : opsLatency) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertNotNull(m.tags.get("status"));
+
+            if (m.tags.get("status").equals("success")) {
+                if (m.tags.get("type").equals("get")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else if (m.tags.get("type").equals("del")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else if (m.tags.get("type").equals("put")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else {
+                    Assert.fail();
+                }
+            } else {
+                if (m.tags.get("type").equals("get")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else if (m.tags.get("type").equals("del")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else if (m.tags.get("type").equals("put")) {
+                    Assert.assertTrue(m.value >= 0);
+                } else {
+                    Assert.fail();
+                }
+            }
+        }
+        for (PrometheusMetricsTest.Metric m : putBytes) {
+            Assert.assertEquals(m.tags.get("cluster"), "test");
+            String metadataStoreName = m.tags.get("name");
+            Assert.assertNotNull(metadataStoreName);
+            Assert.assertTrue(metadataStoreName.equals(MetadataStoreConfig.METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
+                    || metadataStoreName.equals(MetadataStoreConfig.STATE_METADATA_STORE));
+            Assert.assertTrue(m.value > 0);
+        }
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
index 84a42aace85..75b385dba4d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ClusterMetadataSetupTest.java
@@ -251,7 +251,7 @@ public class ClusterMetadataSetupTest {
         PulsarClusterMetadataSetup.main(args);
 
         try (MetadataStoreExtended localStore = PulsarClusterMetadataSetup
-                .initMetadataStore(zkConnection, 30000)) {
+                .initLocalMetadataStore(zkConnection, 30000)) {
             // expected not exist
             assertFalse(localStore.exists("/ledgers").get());
 
@@ -268,7 +268,7 @@ public class ClusterMetadataSetupTest {
 
             PulsarClusterMetadataSetup.main(bookkeeperMetadataServiceUriArgs);
             try (MetadataStoreExtended bookkeeperMetadataServiceUriStore = PulsarClusterMetadataSetup
-                    .initMetadataStore(zkConnection, 30000)) {
+                    .initLocalMetadataStore(zkConnection, 30000)) {
                 // expected not exist
                 assertFalse(bookkeeperMetadataServiceUriStore.exists("/ledgers").get());
             }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
index a34ec879ba6..35a7aa44391 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java
@@ -83,7 +83,7 @@ public class ProxyAuthenticationTest extends ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
index 7e0ee1bd466..982f4cd48eb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthorizationTest.java
@@ -64,7 +64,7 @@ public class ProxyAuthorizationTest extends MockedPulsarServiceBaseTest {
         config.setWebServicePort(Optional.of(0));
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         service.start();
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
index 26cf8a0e154..e6eba5e32cb 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyConfigurationTest.java
@@ -67,7 +67,7 @@ public class ProxyConfigurationTest extends ProducerConsumerBase {
         config.setServiceUrl("http://localhost:8080");
         config.getProperties().setProperty("brokerClient_lookupTimeoutMs", "100");
         WebSocketService service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         service.start();
 
         PulsarClientImpl client = (PulsarClientImpl) service.getPulsarClient();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
index bbd2b3bd14f..87741e5bede 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyEncryptionPublishConsumeTest.java
@@ -79,7 +79,7 @@ public class ProxyEncryptionPublishConsumeTest extends ProducerConsumerBase {
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         config.setCryptoKeyReaderFactoryClassName(CryptoKeyReaderFactoryImpl.class.getName());
         WebSocketService service = spy(new WebSocketService(config));
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
index 918640642ec..951a3db4f02 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java
@@ -101,7 +101,7 @@ public class ProxyPublishConsumeTest extends ProducerConsumerBase {
         config.setClusterName("test");
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
index a8b67416107..5e40a7535a7 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java
@@ -74,7 +74,7 @@ public class ProxyPublishConsumeTlsTest extends TlsProducerConsumerBase {
         config.setBrokerClientAuthenticationPlugin(AuthenticationTls.class.getName());
         config.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
index 1fb12645e5e..c4f4e876c69 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java
@@ -62,7 +62,7 @@ public class ProxyPublishConsumeWithoutZKTest extends ProducerConsumerBase {
         config.setServiceUrl(pulsar.getSafeWebServiceAddress());
         config.setServiceUrlTls(pulsar.getWebServiceAddressTls());
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeper)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
index b80c3fb07be..09a7d3a90fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java
@@ -85,7 +85,7 @@ public class V1_ProxyAuthenticationTest extends V1_ProducerConsumerBase {
         }
 
         service = spyWithClassAndConstructorArgs(WebSocketService.class, config);
-        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createMetadataStore(anyString(), anyInt());
+        doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(service).createConfigMetadataStore(anyString(), anyInt());
         proxyServer = new ProxyServer(config);
         WebSocketServiceStarter.start(proxyServer, service);
         log.info("Proxy Server Started");
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
index 819bfd94cb7..0fcc4c56b40 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreProviderImpl.java
@@ -47,7 +47,8 @@ public class PulsarMetadataStateStoreProviderImpl implements StateStoreProvider
             shouldCloseStore = false;
         } else {
             String metadataUrl = (String) config.get(METADATA_URL);
-            store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
+            store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder()
+                    .metadataStoreName(MetadataStoreConfig.STATE_METADATA_STORE).build());
             shouldCloseStore = true;
         }
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 0ff6dbc0431..d4d4873737d 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -74,7 +74,7 @@ public class Worker {
 
             log.info("starting configuration cache service");
             try {
-                configMetadataStore = PulsarResources.createMetadataStore(
+                configMetadataStore = PulsarResources.createConfigMetadataStore(
                         workerConfig.getConfigurationMetadataStoreUrl(),
                         (int) workerConfig.getMetadataStoreSessionTimeoutMillis());
             } catch (IOException e) {
diff --git a/pulsar-metadata/pom.xml b/pulsar-metadata/pom.xml
index b34271f7f32..7f8c3e93754 100644
--- a/pulsar-metadata/pom.xml
+++ b/pulsar-metadata/pom.xml
@@ -99,6 +99,11 @@
       <artifactId>caffeine</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>io.prometheus</groupId>
+      <artifactId>simpleclient</artifactId>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
index e1d0ede7d2a..f00cc431480 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStoreConfig.java
@@ -29,6 +29,9 @@ import lombok.ToString;
 @Getter
 @ToString
 public class MetadataStoreConfig {
+    public static final String METADATA_STORE = "metadata-store";
+    public static final String STATE_METADATA_STORE = "state-metadata-store";
+    public static final String CONFIGURATION_METADATA_STORE = "configuration-metadata-store";
 
     /**
      * The (implementation specific) session timeout, in milliseconds.
@@ -72,6 +75,12 @@ public class MetadataStoreConfig {
     @Builder.Default
     private final int batchingMaxSizeKb = 128;
 
+    /**
+     * The name of a metadata store.
+     */
+    @Builder.Default
+    private final String metadataStoreName = "";
+
     /**
      * Pluggable MetadataEventSynchronizer to sync metadata events across the
      * separate clusters.
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
index 76a14300d0b..8a9984d626d 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractMetadataDriver.java
@@ -115,6 +115,7 @@ public abstract class AbstractMetadataDriver implements Closeable {
                 this.store = MetadataStoreExtended.create(url,
                         MetadataStoreConfig.builder()
                                 .sessionTimeoutMillis(conf.getZkTimeout())
+                                .metadataStoreName(MetadataStoreConfig.METADATA_STORE)
                                 .build());
                 this.storeInstanceIsOwned = true;
             } catch (MetadataStoreException e) {
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
index 36b0be112e2..24fa4730854 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BKCluster.java
@@ -119,7 +119,8 @@ public class BKCluster implements AutoCloseable {
         this.baseClientConf = newBaseClientConfiguration();
 
         this.store =
-                MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder().build());
+                MetadataStoreExtended.create(clusterConf.metadataServiceUri, MetadataStoreConfig.builder()
+                        .metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
         baseConf.setJournalRemovePagesFromCache(false);
         baseConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store);
         baseClientConf.setProperty(AbstractMetadataDriver.METADATA_STORE_INSTANCE, store);
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
index f7e90dc8e60..96c97b31e51 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java
@@ -35,9 +35,9 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
@@ -59,18 +59,20 @@ import org.apache.pulsar.metadata.api.extended.CreateOption;
 import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
+import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
 
 @Slf4j
 public abstract class AbstractMetadataStore implements MetadataStoreExtended, Consumer<Notification> {
-
     private static final long CACHE_REFRESH_TIME_MILLIS = TimeUnit.MINUTES.toMillis(5);
 
     private final CopyOnWriteArrayList<Consumer<Notification>> listeners = new CopyOnWriteArrayList<>();
     private final CopyOnWriteArrayList<Consumer<SessionEvent>> sessionListeners = new CopyOnWriteArrayList<>();
+    protected final String metadataStoreName;
     protected final ScheduledExecutorService executor;
     private final AsyncLoadingCache<String, List<String>> childrenCache;
     private final AsyncLoadingCache<String, Boolean> existsCache;
     private final CopyOnWriteArrayList<MetadataCacheImpl<?>> metadataCaches = new CopyOnWriteArrayList<>();
+    private final MetadataStoreStats metadataStoreStats;
 
     // We don't strictly need to use 'volatile' here because we don't need the precise consistent semantic. Instead,
     // we want to avoid the overhead of 'volatile'.
@@ -81,9 +83,8 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     protected abstract CompletableFuture<Boolean> existsFromStore(String path);
 
-    protected AbstractMetadataStore() {
-        this.executor = Executors
-                .newSingleThreadScheduledExecutor(new DefaultThreadFactory("metadata-store"));
+    protected AbstractMetadataStore(String metadataStoreName) {
+        this.executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(metadataStoreName));
         registerListener(this);
 
         this.childrenCache = Caffeine.newBuilder()
@@ -127,6 +128,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
                         }
                     }
                 });
+
+        this.metadataStoreName = metadataStoreName;
+        this.metadataStoreStats = new MetadataStoreStats(metadataStoreName);
     }
 
     protected void registerSyncLister(Optional<MetadataEventSynchronizer> synchronizer) {
@@ -236,10 +240,20 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     @Override
     public CompletableFuture<Optional<GetResult>> get(String path) {
+        long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
-            return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
+            metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
+            return FutureUtil
+                    .failedFuture(new MetadataStoreException.InvalidPathException(path));
         }
-        return storeGet(path);
+        return storeGet(path)
+                .whenComplete((v, t) -> {
+                    if (t != null) {
+                        metadataStoreStats.recordGetOpsFailed(System.currentTimeMillis() - start);
+                    } else {
+                        metadataStoreStats.recordGetOpsSucceeded(System.currentTimeMillis() - start);
+                    }
+                });
     }
 
     protected abstract CompletableFuture<Optional<GetResult>> storeGet(String path);
@@ -314,7 +328,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
 
     @Override
     public final CompletableFuture<Void> delete(String path, Optional<Long> expectedVersion) {
+        long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
+            metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
             return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
         }
         if (getMetadataEventSynchronizer().isPresent()) {
@@ -322,9 +338,23 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
                     expectedVersion.orElse(null), Instant.now().toEpochMilli(),
                     getMetadataEventSynchronizer().get().getClusterName(), NotificationType.Deleted);
             return getMetadataEventSynchronizer().get().notify(event)
-                    .thenCompose(__ -> deleteInternal(path, expectedVersion));
+                    .thenCompose(__ -> deleteInternal(path, expectedVersion))
+                    .whenComplete((v, t) -> {
+                        if (null != t) {
+                            metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
+                        } else {
+                            metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
+                        }
+                    });
         } else {
-            return deleteInternal(path, expectedVersion);
+            return deleteInternal(path, expectedVersion)
+                    .whenComplete((v, t) -> {
+                        if (null != t) {
+                            metadataStoreStats.recordDelOpsFailed(System.currentTimeMillis() - start);
+                        } else {
+                            metadataStoreStats.recordDelOpsSucceeded(System.currentTimeMillis() - start);
+                        }
+                    });
         }
     }
 
@@ -364,7 +394,9 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
     @Override
     public final CompletableFuture<Stat> put(String path, byte[] data, Optional<Long> optExpectedVersion,
             EnumSet<CreateOption> options) {
+        long start = System.currentTimeMillis();
         if (!isValidPath(path)) {
+            metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
             return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path));
         }
         HashSet<CreateOption> ops = new HashSet<>(options);
@@ -375,9 +407,25 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
                     Instant.now().toEpochMilli(), getMetadataEventSynchronizer().get().getClusterName(),
                     NotificationType.Modified);
             return getMetadataEventSynchronizer().get().notify(event)
-                    .thenCompose(__ -> putInternal(path, data, optExpectedVersion, options));
+                    .thenCompose(__ -> putInternal(path, data, optExpectedVersion, options))
+                    .whenComplete((v, t) -> {
+                        if (t != null) {
+                            metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
+                        } else {
+                            int len = data == null ? 0 : data.length;
+                            metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
+                        }
+                    });
         } else {
-            return putInternal(path, data, optExpectedVersion, options);
+            return putInternal(path, data, optExpectedVersion, options)
+                    .whenComplete((v, t) -> {
+                        if (t != null) {
+                            metadataStoreStats.recordPutOpsFailed(System.currentTimeMillis() - start);
+                        } else {
+                            int len = data == null ? 0 : data.length;
+                            metadataStoreStats.recordPutOpsSucceeded(System.currentTimeMillis() - start, len);
+                        }
+                    });
         }
 
     }
@@ -428,6 +476,7 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
     public void close() throws Exception {
         executor.shutdownNow();
         executor.awaitTermination(10, TimeUnit.SECONDS);
+        this.metadataStoreStats.close();
     }
 
     @VisibleForTesting
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
index 924a6ac5d6d..b520745a406 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java
@@ -75,6 +75,7 @@ public class LocalMemoryMetadataStore extends AbstractMetadataStore implements M
 
     public LocalMemoryMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
             throws MetadataStoreException {
+        super(metadataStoreConfig.getMetadataStoreName());
         String name = metadataURL.substring(MEMORY_SCHEME_IDENTIFIER.length());
         // Local means a private data set
         // update synchronizer and register sync listener
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
index 69af0e76914..b276d3cb7a8 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java
@@ -216,6 +216,7 @@ public class RocksdbMetadataStore extends AbstractMetadataStore {
      */
     private RocksdbMetadataStore(String metadataURL, MetadataStoreConfig metadataStoreConfig)
             throws MetadataStoreException {
+        super(metadataStoreConfig.getMetadataStoreName());
         this.metadataUrl = metadataURL;
         try {
             RocksDB.loadLibrary();
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
index c9d245b8caf..de7088d74af 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java
@@ -53,7 +53,7 @@ public abstract class AbstractBatchedMetadataStore extends AbstractMetadataStore
     private MetadataEventSynchronizer synchronizer;
 
     protected AbstractBatchedMetadataStore(MetadataStoreConfig conf) {
-        super();
+        super(conf.getMetadataStoreName());
 
         this.enabled = conf.isBatchingEnabled();
         this.maxDelayMillis = conf.getBatchingMaxDelayMillis();
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
new file mode 100644
index 00000000000..2351ba1a591
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/MetadataStoreStats.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
+
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public final class MetadataStoreStats implements AutoCloseable {
+    private static final double[] BUCKETS = new double[]{1, 3, 5, 10, 20, 50, 100, 200, 500, 1000, 2000, 5000};
+    private static final String OPS_TYPE_LABEL_NAME = "type";
+    private static final String METADATA_STORE_LABEL_NAME = "name";
+    private static final String STATUS = "status";
+
+    private static final String OPS_TYPE_GET = "get";
+    private static final String OPS_TYPE_DEL = "del";
+    private static final String OPS_TYPE_PUT = "put";
+    private static final String STATUS_SUCCESS = "success";
+    private static final String STATUS_FAIL = "fail";
+
+    protected static final String PREFIX = "pulsar_metadata_store_";
+
+    private static final Histogram OPS_LATENCY = Histogram
+            .build(PREFIX + "ops_latency", "-")
+            .unit("ms")
+            .buckets(BUCKETS)
+            .labelNames(METADATA_STORE_LABEL_NAME, OPS_TYPE_LABEL_NAME, STATUS)
+            .register();
+    private static final Counter PUT_BYTES = Counter
+            .build(PREFIX + "put", "-")
+            .unit("bytes")
+            .labelNames(METADATA_STORE_LABEL_NAME)
+            .register();
+
+    private final Histogram.Child getOpsSucceedChild;
+    private final Histogram.Child delOpsSucceedChild;
+    private final Histogram.Child putOpsSucceedChild;
+    private final Histogram.Child getOpsFailedChild;
+    private final Histogram.Child delOpsFailedChild;
+    private final Histogram.Child putOpsFailedChild;
+    private final Counter.Child putBytesChild;
+    private final String metadataStoreName;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    public MetadataStoreStats(String metadataStoreName) {
+        this.metadataStoreName = metadataStoreName;
+
+        this.getOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS);
+        this.delOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS);
+        this.putOpsSucceedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS);
+        this.getOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_GET, STATUS_FAIL);
+        this.delOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL);
+        this.putOpsFailedChild = OPS_LATENCY.labels(metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL);
+        this.putBytesChild = PUT_BYTES.labels(metadataStoreName);
+    }
+
+    public void recordGetOpsSucceeded(long millis) {
+        this.getOpsSucceedChild.observe(millis);
+    }
+
+    public void recordDelOpsSucceeded(long millis) {
+        this.delOpsSucceedChild.observe(millis);
+    }
+
+    public void recordPutOpsSucceeded(long millis, int bytes) {
+        this.putOpsSucceedChild.observe(millis);
+        this.putBytesChild.inc(bytes);
+    }
+
+    public void recordGetOpsFailed(long millis) {
+        this.getOpsFailedChild.observe(millis);
+    }
+
+    public void recordDelOpsFailed(long millis) {
+        this.delOpsFailedChild.observe(millis);
+    }
+
+    public void recordPutOpsFailed(long millis) {
+        this.putOpsFailedChild.observe(millis);
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (this.closed.compareAndSet(false, true)) {
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_SUCCESS);
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_SUCCESS);
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_SUCCESS);
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_GET, STATUS_FAIL);
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_DEL, STATUS_FAIL);
+            OPS_LATENCY.remove(this.metadataStoreName, OPS_TYPE_PUT, STATUS_FAIL);
+            PUT_BYTES.remove(this.metadataStoreName);
+        }
+    }
+}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java
new file mode 100644
index 00000000000..15ca0d1c582
--- /dev/null
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/stats/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.metadata.impl.stats;
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 8b8b474e5e3..cc1e4cbc880 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -447,12 +447,12 @@ public class ProxyService implements Closeable {
     }
 
     public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException {
-        return PulsarResources.createMetadataStore(proxyConfig.getMetadataStoreUrl(),
+        return PulsarResources.createLocalMetadataStore(proxyConfig.getMetadataStoreUrl(),
                 proxyConfig.getMetadataStoreSessionTimeoutMillis());
     }
 
     public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException {
-        return PulsarResources.createMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
+        return PulsarResources.createConfigMetadataStore(proxyConfig.getConfigurationMetadataStoreUrl(),
                 proxyConfig.getMetadataStoreSessionTimeoutMillis());
     }
 
diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
index f6a4771f9e4..852ed7f18e8 100644
--- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
+++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java
@@ -74,7 +74,7 @@ public class PulsarConnectorCache {
 
     private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
         this.metadataStore = MetadataStoreExtended.create(pulsarConnectorConfig.getZookeeperUri(),
-                MetadataStoreConfig.builder().build());
+                MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
         this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
         this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
                 StatsProvider.class, getClass().getClassLoader());
diff --git a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
index 73987c8b4a8..ab30e7dd4ec 100644
--- a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
+++ b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/ManagedLedgerWriter.java
@@ -193,7 +193,7 @@ public class ManagedLedgerWriter {
 
         @Cleanup
         MetadataStoreExtended metadataStore = MetadataStoreExtended.create(arguments.metadataStoreUrl,
-                MetadataStoreConfig.builder().build());
+                MetadataStoreConfig.builder().metadataStoreName(MetadataStoreConfig.METADATA_STORE).build());
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkConf, mlFactoryConf);
 
         ManagedLedgerConfig mlConf = new ManagedLedgerConfig();
diff --git a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
index f8b63e0641f..7e798eecfdd 100644
--- a/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
+++ b/pulsar-websocket/src/main/java/org/apache/pulsar/websocket/WebSocketService.java
@@ -106,7 +106,7 @@ public class WebSocketService implements Closeable {
 
         if (isNotBlank(config.getConfigurationMetadataStoreUrl())) {
             try {
-                configMetadataStore = createMetadataStore(config.getConfigurationMetadataStoreUrl(),
+                configMetadataStore = createConfigMetadataStore(config.getConfigurationMetadataStoreUrl(),
                         (int) config.getMetadataStoreSessionTimeoutMillis());
             } catch (MetadataStoreException e) {
                 throw new PulsarServerException(e);
@@ -140,9 +140,9 @@ public class WebSocketService implements Closeable {
         log.info("Pulsar WebSocket Service started");
     }
 
-    public MetadataStoreExtended createMetadataStore(String serverUrls, int sessionTimeoutMs)
+    public MetadataStoreExtended createConfigMetadataStore(String serverUrls, int sessionTimeoutMs)
             throws MetadataStoreException {
-        return PulsarResources.createMetadataStore(serverUrls, sessionTimeoutMs);
+        return PulsarResources.createConfigMetadataStore(serverUrls, sessionTimeoutMs);
     }
 
     @Override